Skip to content

Commit 83d9258

Browse files
authored
fix batched parallel stream error with defer (#165)
* fix batched parallel stream error with defer where the initial responseNode was not added to the list using identical helpers for all bundlers makes this code better tested all around * add changeset
1 parent f390fd0 commit 83d9258

File tree

2 files changed

+121
-85
lines changed

2 files changed

+121
-85
lines changed

.changeset/curly-onions-kiss.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'graphql-executor': patch
3+
---
4+
5+
fix batch parallel streaming in combination with deferred fragments

src/execution/executor.ts

+116-85
Original file line numberDiff line numberDiff line change
@@ -1499,6 +1499,26 @@ export class Executor {
14991499
);
15001500
}
15011501

1502+
onNewBundleContext<T extends SubsequentResponseContext>(
1503+
state: ExecutionState,
1504+
context: T,
1505+
responseNode: ResponseNode,
1506+
): T {
1507+
state.pendingPushes++;
1508+
state.pendingStreamResults--;
1509+
context.responseNodes.push(responseNode);
1510+
return context;
1511+
}
1512+
1513+
onSubsequentResponseNode<T extends SubsequentResponseContext>(
1514+
state: ExecutionState,
1515+
context: T,
1516+
responseNode: ResponseNode,
1517+
): void {
1518+
state.pendingStreamResults--;
1519+
context.responseNodes.push(responseNode);
1520+
}
1521+
15021522
createStreamContext(
15031523
exeContext: ExecutionContext,
15041524
initialCount: number,
@@ -1519,32 +1539,34 @@ export class Executor {
15191539
initialIndex: initialCount,
15201540
maxBundleSize: maxChunkSize,
15211541
maxInterval,
1522-
createDataBundleContext: (index, result) => {
1523-
exeContext.state.pendingPushes++;
1524-
exeContext.state.pendingStreamResults--;
1525-
return {
1526-
responseNodes: [result.responseNode],
1527-
parentResponseNode,
1528-
result: result.data,
1529-
atIndex: index,
1530-
};
1531-
},
1532-
createErrorBundleContext: (index, responseNode) => {
1533-
exeContext.state.pendingPushes++;
1534-
exeContext.state.pendingStreamResults--;
1535-
return {
1536-
responseNodes: [responseNode],
1537-
parentResponseNode,
1538-
atIndex: index,
1539-
};
1540-
} /* c8 ignore start */,
1542+
createDataBundleContext: (index, result) =>
1543+
this.onNewBundleContext(
1544+
exeContext.state,
1545+
{
1546+
responseNodes: [],
1547+
parentResponseNode,
1548+
result: result.data,
1549+
atIndex: index,
1550+
},
1551+
result.responseNode,
1552+
),
1553+
createErrorBundleContext: (index, responseNode) =>
1554+
this.onNewBundleContext(
1555+
exeContext.state,
1556+
{
1557+
responseNodes: [],
1558+
parentResponseNode,
1559+
atIndex: index,
1560+
},
1561+
responseNode,
1562+
) /* c8 ignore start */,
15411563
onSubsequentData: () => {
15421564
/* with maxBundleSize of 1, this function will never be called */
15431565
},
15441566
onSubsequentError: () => {
15451567
/* with maxBundleSize of 1, this function will never be called */
15461568
} /* c8 ignore stop */,
1547-
onDataBundle: (context) => {
1569+
onDataBundle: (context) =>
15481570
exeContext.publisher.queue(
15491571
context.responseNodes,
15501572
{
@@ -1554,9 +1576,8 @@ export class Executor {
15541576
label,
15551577
},
15561578
parentResponseNode,
1557-
);
1558-
},
1559-
onErrorBundle: (context) => {
1579+
),
1580+
onErrorBundle: (context) =>
15601581
exeContext.publisher.queue(
15611582
context.responseNodes,
15621583
{
@@ -1566,8 +1587,7 @@ export class Executor {
15661587
label,
15671588
},
15681589
parentResponseNode,
1569-
);
1570-
},
1590+
),
15711591
});
15721592

15731593
return {
@@ -1592,37 +1612,45 @@ export class Executor {
15921612
initialIndex: initialCount,
15931613
maxBundleSize: maxChunkSize,
15941614
maxInterval,
1595-
createDataBundleContext: (index, result) => {
1596-
exeContext.state.pendingPushes++;
1597-
exeContext.state.pendingStreamResults--;
1598-
return {
1599-
responseNodes: [result.responseNode],
1600-
parentResponseNode,
1601-
atIndices: [index],
1602-
results: [result.data],
1603-
};
1604-
},
1605-
createErrorBundleContext: (index, responseNode) => {
1606-
exeContext.state.pendingPushes++;
1607-
exeContext.state.pendingStreamResults--;
1608-
return {
1609-
responseNodes: [responseNode],
1610-
parentResponseNode,
1611-
atIndices: [index],
1612-
};
1613-
},
1615+
createDataBundleContext: (index, result) =>
1616+
this.onNewBundleContext(
1617+
exeContext.state,
1618+
{
1619+
responseNodes: [],
1620+
parentResponseNode,
1621+
atIndices: [index],
1622+
results: [result.data],
1623+
},
1624+
result.responseNode,
1625+
),
1626+
createErrorBundleContext: (index, responseNode) =>
1627+
this.onNewBundleContext(
1628+
exeContext.state,
1629+
{
1630+
responseNodes: [],
1631+
parentResponseNode,
1632+
atIndices: [index],
1633+
},
1634+
responseNode,
1635+
),
16141636
onSubsequentData: (index, result, context) => {
1615-
exeContext.state.pendingStreamResults--;
1616-
context.responseNodes.push(result.responseNode);
1637+
this.onSubsequentResponseNode(
1638+
exeContext.state,
1639+
context,
1640+
result.responseNode,
1641+
);
16171642
context.results.push(result.data);
16181643
context.atIndices.push(index);
16191644
},
16201645
onSubsequentError: (index, responseNode, context) => {
1621-
exeContext.state.pendingStreamResults--;
1622-
context.responseNodes.push(responseNode);
1646+
this.onSubsequentResponseNode(
1647+
exeContext.state,
1648+
context,
1649+
responseNode,
1650+
);
16231651
context.atIndices.push(index);
16241652
},
1625-
onDataBundle: (context) => {
1653+
onDataBundle: (context) =>
16261654
exeContext.publisher.queue(
16271655
context.responseNodes,
16281656
{
@@ -1633,9 +1661,8 @@ export class Executor {
16331661
label,
16341662
},
16351663
parentResponseNode,
1636-
);
1637-
},
1638-
onErrorBundle: (context) => {
1664+
),
1665+
onErrorBundle: (context) =>
16391666
exeContext.publisher.queue(
16401667
context.responseNodes,
16411668
{
@@ -1646,8 +1673,7 @@ export class Executor {
16461673
label,
16471674
},
16481675
parentResponseNode,
1649-
);
1650-
},
1676+
),
16511677
}),
16521678
};
16531679
}
@@ -1666,35 +1692,42 @@ export class Executor {
16661692
initialIndex: initialCount,
16671693
maxBundleSize: maxChunkSize,
16681694
maxInterval,
1669-
createDataBundleContext: (index, result) => {
1670-
exeContext.state.pendingPushes++;
1671-
exeContext.state.pendingStreamResults--;
1672-
return {
1673-
responseNodes: [],
1674-
parentResponseNode,
1675-
atIndex: index,
1676-
results: [result.data],
1677-
};
1678-
},
1679-
createErrorBundleContext: (index, responseNode) => {
1680-
exeContext.state.pendingPushes++;
1681-
exeContext.state.pendingStreamResults--;
1682-
return {
1683-
responseNodes: [responseNode],
1684-
parentResponseNode,
1685-
atIndex: index,
1686-
};
1687-
},
1695+
createDataBundleContext: (index, result) =>
1696+
this.onNewBundleContext(
1697+
exeContext.state,
1698+
{
1699+
responseNodes: [],
1700+
parentResponseNode,
1701+
atIndex: index,
1702+
results: [result.data],
1703+
},
1704+
result.responseNode,
1705+
),
1706+
createErrorBundleContext: (index, responseNode) =>
1707+
this.onNewBundleContext(
1708+
exeContext.state,
1709+
{
1710+
responseNodes: [],
1711+
parentResponseNode,
1712+
atIndex: index,
1713+
},
1714+
responseNode,
1715+
),
16881716
onSubsequentData: (_index, result, context) => {
1689-
exeContext.state.pendingStreamResults--;
1690-
context.responseNodes.push(result.responseNode);
1717+
this.onSubsequentResponseNode(
1718+
exeContext.state,
1719+
context,
1720+
result.responseNode,
1721+
);
16911722
context.results.push(result.data);
16921723
},
1693-
onSubsequentError: (_index, responseNode, context) => {
1694-
exeContext.state.pendingStreamResults--;
1695-
context.responseNodes.push(responseNode);
1696-
},
1697-
onDataBundle: (context) => {
1724+
onSubsequentError: (_index, responseNode, context) =>
1725+
this.onSubsequentResponseNode(
1726+
exeContext.state,
1727+
context,
1728+
responseNode,
1729+
),
1730+
onDataBundle: (context) =>
16981731
exeContext.publisher.queue(
16991732
context.responseNodes,
17001733
{
@@ -1705,9 +1738,8 @@ export class Executor {
17051738
label,
17061739
},
17071740
parentResponseNode,
1708-
);
1709-
},
1710-
onErrorBundle: (context) => {
1741+
),
1742+
onErrorBundle: (context) =>
17111743
exeContext.publisher.queue(
17121744
context.responseNodes,
17131745
{
@@ -1718,8 +1750,7 @@ export class Executor {
17181750
label,
17191751
},
17201752
parentResponseNode,
1721-
);
1722-
},
1753+
),
17231754
}),
17241755
),
17251756
};

0 commit comments

Comments
 (0)