Skip to content

Commit 99a85d4

Browse files
authored
introduce experimental batched streaming (#162)
1 parent 6b06772 commit 99a85d4

File tree

5 files changed

+365
-38
lines changed

5 files changed

+365
-38
lines changed

Diff for: .changeset/giant-llamas-peel.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,5 @@
55
introduce experimental parallel streaming
66

77
Experimental `inParallel` boolean argument to the stream directive may now be used to stream list items as they are ready instead of in sequential list order.
8+
9+
When parallel streaming is enabled, the `data` property of execution patch results will consist of an array of items and a new `atIndices` property will contain the corresponding indices of the items.

Diff for: .changeset/thin-maps-boil.md

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'graphql-executor': patch
3+
---
4+
5+
introduce experimental batched streaming
6+
7+
Experimental `maxChunkSize` and `maxInterval` arguments allows for increasing the number of items in each streamed payload up to the specified maximum size. A maximum interval (specified in milliseconds) can be used to send any ready items prior to the maximum chunk size.
8+
9+
When using a `maxChunkSize` greater than 1, the `data` property of execution patch results will consist of an array of items and a new `atIndex` property will contain the initial index for the items included within the chunk.
10+
11+
These options can be combined with parallel streaming. When streaming in parallel, the `data` property will always consist on an array of items and the `atIndices` property will always consist of an array of the matching indices, even when `maxChunkSize` is equal to 1. If these new arguments prove popular, `data` should probably be an array even when `maxChunkSize` is equal to one, even without parallel streaming.

Diff for: src/execution/__tests__/stream-test.ts

+215-26
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,31 @@ describe('Execute: stream directive', () => {
266266
},
267267
]);
268268
});
269+
it('Can stream a list field in chunks of size greater than 1', async () => {
270+
const document = parse('{ scalarList @stream(maxChunkSize: 2) }');
271+
const result = await complete(document);
272+
273+
expect(result).to.deep.equal([
274+
{
275+
data: {
276+
scalarList: [],
277+
},
278+
hasNext: true,
279+
},
280+
{
281+
data: ['apple', 'banana'],
282+
path: ['scalarList'],
283+
atIndex: 0,
284+
hasNext: true,
285+
},
286+
{
287+
data: ['coconut'],
288+
path: ['scalarList'],
289+
atIndex: 2,
290+
hasNext: false,
291+
},
292+
]);
293+
});
269294
it('Can use default value of initialCount', async () => {
270295
const document = parse('{ scalarList @stream }');
271296
const result = await complete(document);
@@ -311,7 +336,52 @@ describe('Execute: stream directive', () => {
311336
expectJSON(result).toDeepEqual({
312337
errors: [
313338
{
314-
message: 'initialCount must be a positive integer',
339+
message:
340+
'initialCount must be an integer greater than or equal to zero',
341+
locations: [
342+
{
343+
line: 1,
344+
column: 3,
345+
},
346+
],
347+
path: ['scalarList'],
348+
},
349+
],
350+
data: {
351+
scalarList: null,
352+
},
353+
});
354+
});
355+
it('maxChunkSize values less than one throw field errors', async () => {
356+
const document = parse('{ scalarList @stream(maxChunkSize: 0) }');
357+
const result = await complete(document);
358+
expectJSON(result).toDeepEqual({
359+
errors: [
360+
{
361+
message:
362+
'maxChunkSize must be an integer greater than or equal to one',
363+
locations: [
364+
{
365+
line: 1,
366+
column: 3,
367+
},
368+
],
369+
path: ['scalarList'],
370+
},
371+
],
372+
data: {
373+
scalarList: null,
374+
},
375+
});
376+
});
377+
it('maxInterval values less than zero throw field errors', async () => {
378+
const document = parse('{ scalarList @stream(maxInterval: -1) }');
379+
const result = await complete(document);
380+
expectJSON(result).toDeepEqual({
381+
errors: [
382+
{
383+
message:
384+
'maxInterval must be an integer greater than or equal to zero',
315385
locations: [
316386
{
317387
line: 1,
@@ -460,27 +530,36 @@ describe('Execute: stream directive', () => {
460530
hasNext: true,
461531
},
462532
{
463-
data: {
464-
name: 'Han',
465-
id: '2',
466-
},
467-
path: ['asyncSlowList', 1],
533+
data: [
534+
{
535+
name: 'Han',
536+
id: '2',
537+
},
538+
],
539+
path: ['asyncSlowList'],
540+
atIndices: [1],
468541
hasNext: true,
469542
},
470543
{
471-
data: {
472-
name: 'Leia',
473-
id: '3',
474-
},
475-
path: ['asyncSlowList', 2],
544+
data: [
545+
{
546+
name: 'Leia',
547+
id: '3',
548+
},
549+
],
550+
path: ['asyncSlowList'],
551+
atIndices: [2],
476552
hasNext: true,
477553
},
478554
{
479-
data: {
480-
name: 'Luke',
481-
id: '1',
482-
},
483-
path: ['asyncSlowList', 0],
555+
data: [
556+
{
557+
name: 'Luke',
558+
id: '1',
559+
},
560+
],
561+
path: ['asyncSlowList'],
562+
atIndices: [0],
484563
hasNext: false,
485564
},
486565
]);
@@ -622,6 +701,51 @@ describe('Execute: stream directive', () => {
622701
},
623702
]);
624703
});
704+
it('Can stream a field that returns an async iterable in chunks of size greater than 1', async () => {
705+
const document = parse(`
706+
query {
707+
asyncIterableList @stream(maxChunkSize: 2) {
708+
name
709+
id
710+
}
711+
}
712+
`);
713+
const result = await complete(document);
714+
expect(result).to.deep.equal([
715+
{
716+
data: {
717+
asyncIterableList: [],
718+
},
719+
hasNext: true,
720+
},
721+
{
722+
data: [
723+
{
724+
name: 'Luke',
725+
id: '1',
726+
},
727+
{
728+
name: 'Han',
729+
id: '2',
730+
},
731+
],
732+
path: ['asyncIterableList'],
733+
atIndex: 0,
734+
hasNext: true,
735+
},
736+
{
737+
data: [
738+
{
739+
name: 'Leia',
740+
id: '3',
741+
},
742+
],
743+
path: ['asyncIterableList'],
744+
atIndex: 2,
745+
hasNext: false,
746+
},
747+
]);
748+
});
625749
it('Can stream a field that returns an async iterable, using a non-zero initialCount', async () => {
626750
const document = parse(`
627751
query {
@@ -700,7 +824,8 @@ describe('Execute: stream directive', () => {
700824
expectJSON(result).toDeepEqual({
701825
errors: [
702826
{
703-
message: 'initialCount must be a positive integer',
827+
message:
828+
'initialCount must be an integer greater than or equal to zero',
704829
locations: [
705830
{
706831
line: 3,
@@ -930,10 +1055,10 @@ describe('Execute: stream directive', () => {
9301055
},
9311056
]);
9321057
});
933-
it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => {
1058+
it('Handles null returned in non-null async iterable list items after initialCount is reached with maxChunkSize greater than 1', async () => {
9341059
const document = parse(`
9351060
query {
936-
asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) {
1061+
asyncIterableNonNullError @stream(initialCount: 0, maxChunkSize: 2) {
9371062
name
9381063
}
9391064
}
@@ -947,15 +1072,19 @@ describe('Execute: stream directive', () => {
9471072
hasNext: true,
9481073
},
9491074
{
950-
data: {
951-
name: 'Luke',
952-
},
953-
path: ['asyncIterableNonNullError', 0],
1075+
data: [
1076+
{
1077+
name: 'Luke',
1078+
},
1079+
],
1080+
path: ['asyncIterableNonNullError'],
1081+
atIndex: 0,
9541082
hasNext: true,
9551083
},
9561084
{
9571085
data: null,
958-
path: ['asyncIterableNonNullError', 1],
1086+
path: ['asyncIterableNonNullError'],
1087+
atIndex: 1,
9591088
errors: [
9601089
{
9611090
message:
@@ -971,11 +1100,71 @@ describe('Execute: stream directive', () => {
9711100
],
9721101
hasNext: true,
9731102
},
1103+
{
1104+
data: [
1105+
{
1106+
name: 'Han',
1107+
},
1108+
],
1109+
path: ['asyncIterableNonNullError'],
1110+
atIndex: 2,
1111+
hasNext: false,
1112+
},
1113+
]);
1114+
});
1115+
it('Handles null returned in non-null async iterable list items after initialCount is reached with parallel streaming', async () => {
1116+
const document = parse(`
1117+
query {
1118+
asyncIterableNonNullError @stream(initialCount: 0, inParallel: true) {
1119+
name
1120+
}
1121+
}
1122+
`);
1123+
const result = await complete(document);
1124+
expectJSON(result).toDeepEqual([
9741125
{
9751126
data: {
976-
name: 'Han',
1127+
asyncIterableNonNullError: [],
9771128
},
978-
path: ['asyncIterableNonNullError', 2],
1129+
hasNext: true,
1130+
},
1131+
{
1132+
data: [
1133+
{
1134+
name: 'Luke',
1135+
},
1136+
],
1137+
path: ['asyncIterableNonNullError'],
1138+
atIndices: [0],
1139+
hasNext: true,
1140+
},
1141+
{
1142+
data: null,
1143+
path: ['asyncIterableNonNullError'],
1144+
atIndices: [1],
1145+
errors: [
1146+
{
1147+
message:
1148+
'Cannot return null for non-nullable field Query.asyncIterableNonNullError.',
1149+
locations: [
1150+
{
1151+
line: 3,
1152+
column: 9,
1153+
},
1154+
],
1155+
path: ['asyncIterableNonNullError', 1],
1156+
},
1157+
],
1158+
hasNext: true,
1159+
},
1160+
{
1161+
data: [
1162+
{
1163+
name: 'Han',
1164+
},
1165+
],
1166+
path: ['asyncIterableNonNullError'],
1167+
atIndices: [2],
9791168
hasNext: false,
9801169
},
9811170
]);

0 commit comments

Comments
 (0)