From cd97ca70cd415c3f25d5041ff969f73378c585b8 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Thu, 22 Oct 2020 19:57:11 -0400 Subject: [PATCH] add defer/stream support for subscriptions --- .../__tests__/flattenAsyncIterator-test.js | 141 +++++++++++++++++ src/subscription/__tests__/subscribe-test.js | 147 ++++++++++++++++++ src/subscription/flattenAsyncIterator.js | 49 ++++++ src/subscription/subscribe.js | 23 ++- 4 files changed, 346 insertions(+), 14 deletions(-) create mode 100644 src/subscription/__tests__/flattenAsyncIterator-test.js create mode 100644 src/subscription/flattenAsyncIterator.js diff --git a/src/subscription/__tests__/flattenAsyncIterator-test.js b/src/subscription/__tests__/flattenAsyncIterator-test.js new file mode 100644 index 00000000000..6694b523570 --- /dev/null +++ b/src/subscription/__tests__/flattenAsyncIterator-test.js @@ -0,0 +1,141 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import flattenAsyncIterator from '../flattenAsyncIterator'; + +describe('flattenAsyncIterator', () => { + it('does not modify an already flat async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield 3; + } + + const result = flattenAsyncIterator(source()); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('does not modify an already flat async iterator', async () => { + const items = [1, 2, 3]; + + const iterator: any = { + [Symbol.asyncIterator]() { + return this; + }, + next() { + return Promise.resolve({ + done: items.length === 0, + value: items.shift(), + }); + }, + }; + + const result = flattenAsyncIterator(iterator); + + expect(await result.next()).to.deep.equal({ value: 1, done: false }); + expect(await result.next()).to.deep.equal({ value: 2, done: false }); + expect(await result.next()).to.deep.equal({ value: 3, done: false }); + expect(await result.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('flatten nested async generators', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* () { + yield 2.1; + yield 2.2; + yield (async function* () { + yield 2.21; + yield 2.22; + yield 2.23; + })(); + yield 2.3; + })(); + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + const result = []; + for await (const x of doubles) { + result.push(x); + } + expect(result).to.deep.equal([1, 2, 2.1, 2.2, 2.21, 2.22, 2.23, 2.3, 3]); + }); + + it('allows returning early from a nested async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* () { + yield 2.1; + // istanbul ignore next (Shouldn't be reached) + yield 2.2; + })(); + // istanbul ignore next (Shouldn't be reached) + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Early return + expect(await doubles.return()).to.deep.equal({ + value: undefined, + done: true, + }); + + // Subsequent next calls + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + expect(await doubles.next()).to.deep.equal({ + value: undefined, + done: true, + }); + }); + + it('allows throwing errors from a nested async generator', async () => { + async function* source() { + yield 1; + yield 2; + yield (async function* () { + yield 2.1; + // istanbul ignore next (Shouldn't be reached) + yield 2.2; + })(); + // istanbul ignore next (Shouldn't be reached) + yield 3; + } + + const doubles = flattenAsyncIterator(source()); + + expect(await doubles.next()).to.deep.equal({ value: 1, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2, done: false }); + expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false }); + + // Throw error + let caughtError; + try { + await doubles.throw('ouch'); + } catch (e) { + caughtError = e; + } + expect(caughtError).to.equal('ouch'); + }); +}); diff --git a/src/subscription/__tests__/subscribe-test.js b/src/subscription/__tests__/subscribe-test.js index 5df245c3e75..b7414ec1d2f 100644 --- a/src/subscription/__tests__/subscribe-test.js +++ b/src/subscription/__tests__/subscribe-test.js @@ -668,6 +668,153 @@ describe('Subscription Publish Phase', () => { }); }); + it('produces additional payloads for subscriptions with @defer', async () => { + const pubsub = new SimplePubSub(); + const subscription = await createSubscription( + pubsub, + emailSchema, + parse(` + subscription ($priority: Int = 0) { + importantEmail(priority: $priority) { + email { + from + subject + } + ... @defer { + inbox { + unread + total + } + } + } + } + `), + ); + invariant(isAsyncIterable(subscription)); + // Wait for the next subscription payload. + const payload = subscription.next(); + + // A new email arrives! + expect( + pubsub.emit({ + from: 'yuzhi@graphql.org', + subject: 'Alright', + message: 'Tests are good', + unread: true, + }), + ).to.equal(true); + + // The previously waited on payload now has a value. + expect(await payload).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'yuzhi@graphql.org', + subject: 'Alright', + }, + }, + }, + hasNext: true, + }, + }); + + // Wait for the next payload from @defer + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + inbox: { + unread: 1, + total: 2, + }, + }, + path: ['importantEmail'], + hasNext: false, + }, + }); + + // Another new email arrives, after all incrementally delivered payloads are received. + expect( + pubsub.emit({ + from: 'hyo@graphql.org', + subject: 'Tools', + message: 'I <3 making things', + unread: true, + }), + ).to.equal(true); + + // The next waited on payload will have a value. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'hyo@graphql.org', + subject: 'Tools', + }, + }, + }, + hasNext: true, + }, + }); + + // Another new email arrives, before the incrementally delivered payloads from the last email was received. + expect( + pubsub.emit({ + from: 'adam@graphql.org', + subject: 'Important', + message: 'Read me please', + unread: true, + }), + ).to.equal(true); + + // Deferred payload from previous event is received. + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + inbox: { + unread: 2, + total: 3, + }, + }, + path: ['importantEmail'], + hasNext: false, + }, + }); + + // Next payload from last event + expect(await subscription.next()).to.deep.equal({ + done: false, + value: { + data: { + importantEmail: { + email: { + from: 'adam@graphql.org', + subject: 'Important', + }, + }, + }, + hasNext: true, + }, + }); + + // The client disconnects before the deferred payload is consumed. + expect(await subscription.return()).to.deep.equal({ + done: true, + value: undefined, + }); + + // Awaiting a subscription after closing it results in completed results. + expect(await subscription.next()).to.deep.equal({ + done: true, + value: undefined, + }); + }); + it('produces a payload when there are multiple events', async () => { const pubsub = new SimplePubSub(); const subscription = await createSubscription(pubsub); diff --git a/src/subscription/flattenAsyncIterator.js b/src/subscription/flattenAsyncIterator.js new file mode 100644 index 00000000000..9c551eec1e6 --- /dev/null +++ b/src/subscription/flattenAsyncIterator.js @@ -0,0 +1,49 @@ +import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols'; + +import isAsyncIterable from '../jsutils/isAsyncIterable'; + +/** + * Given an AsyncIterable and a callback function, return an AsyncIterator + * which produces values mapped via calling the callback function. + */ +export default function flattenAsyncIterator( + iterable: AsyncGenerator | T, void, void>, +): AsyncGenerator { + // $FlowFixMe[prop-missing] + const iteratorMethod = iterable[SYMBOL_ASYNC_ITERATOR]; + const iterator: any = iteratorMethod.call(iterable); + let iteratorStack: Array> = [iterator]; + + function next(): Promise> { + const currentIterator = iteratorStack[0]; + if (!currentIterator) { + return Promise.resolve({ value: undefined, done: true }); + } + return currentIterator.next().then((result) => { + if (result.done) { + iteratorStack.shift(); + return next(); + } else if (isAsyncIterable(result.value)) { + const childIteratorMethod = result.value[SYMBOL_ASYNC_ITERATOR]; + const childIterator: any = childIteratorMethod.call(result.value); + iteratorStack.unshift(childIterator); + return next(); + } + return result; + }); + } + return ({ + next, + return() { + iteratorStack = []; + return iterator.return(); + }, + throw(error?: mixed): Promise> { + iteratorStack = []; + return iterator.throw(error); + }, + [SYMBOL_ASYNC_ITERATOR]() { + return this; + }, + }: $FlowFixMe); +} diff --git a/src/subscription/subscribe.js b/src/subscription/subscribe.js index ce931ef8022..8386413e3da 100644 --- a/src/subscription/subscribe.js +++ b/src/subscription/subscribe.js @@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition'; import { getOperationRootType } from '../utilities/getOperationRootType'; import mapAsyncIterator from './mapAsyncIterator'; +import flattenAsyncIterator from './flattenAsyncIterator'; export type SubscriptionArgs = {| schema: GraphQLSchema, @@ -140,8 +141,8 @@ function subscribeImpl( // the GraphQL specification. The `execute` function provides the // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the // "ExecuteQuery" algorithm, for which `execute` is also used. - const mapSourceToResponse = (payload) => { - const executionResult = execute({ + const mapSourceToResponse = (payload) => + execute({ schema, document, rootValue: payload, @@ -150,24 +151,18 @@ function subscribeImpl( operationName, fieldResolver, }); - /* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */ - if (isAsyncIterable(executionResult)) { - throw new Error( - 'TODO: implement support for defer/stream in subscriptions', - ); - } - return executionResult; - }; // Resolve the Source Stream, then map every source value to a // ExecutionResult value as described above. return sourcePromise.then((resultOrStream) => // Note: Flow can't refine isAsyncIterable, so explicit casts are used. isAsyncIterable(resultOrStream) - ? mapAsyncIterator( - resultOrStream, - mapSourceToResponse, - reportGraphQLError, + ? flattenAsyncIterator( + mapAsyncIterator( + resultOrStream, + mapSourceToResponse, + reportGraphQLError, + ), ) : ((resultOrStream: any): ExecutionResult), );