From d5f0ca5fa7bff6ebfb073a01ff0fdea6bfae8b20 Mon Sep 17 00:00:00 2001 From: Rob Richard Date: Wed, 14 Oct 2020 19:09:04 -0400 Subject: [PATCH] Implement support for @stream directive --- src/execution/__tests__/stream-test.js | 629 ++++++++++++++++++ src/execution/execute.js | 218 +++++- .../OverlappingFieldsCanBeMergedRule-test.js | 117 ++++ .../rules/OverlappingFieldsCanBeMergedRule.js | 60 ++ 4 files changed, 1021 insertions(+), 3 deletions(-) create mode 100644 src/execution/__tests__/stream-test.js diff --git a/src/execution/__tests__/stream-test.js b/src/execution/__tests__/stream-test.js new file mode 100644 index 0000000000..8be71a0333 --- /dev/null +++ b/src/execution/__tests__/stream-test.js @@ -0,0 +1,629 @@ +import { expect } from 'chai'; +import { describe, it } from 'mocha'; + +import isAsyncIterable from '../../jsutils/isAsyncIterable'; +import { parse } from '../../language/parser'; + +import { GraphQLID, GraphQLString } from '../../type/scalars'; +import { GraphQLSchema } from '../../type/schema'; +import { GraphQLObjectType, GraphQLList } from '../../type/definition'; + +import { execute } from '../execute'; + +const friendType = new GraphQLObjectType({ + fields: { + id: { type: GraphQLID }, + name: { type: GraphQLString }, + asyncName: { + type: GraphQLString, + resolve(rootValue) { + return Promise.resolve(rootValue.name); + }, + }, + }, + name: 'Friend', +}); + +const friends = [ + { name: 'Luke', id: 1 }, + { name: 'Han', id: 2 }, + { name: 'Leia', id: 3 }, +]; + +const query = new GraphQLObjectType({ + fields: { + scalarList: { + type: new GraphQLList(GraphQLString), + resolve: () => ['apple', 'banana', 'coconut'], + }, + asyncList: { + type: new GraphQLList(friendType), + resolve: () => friends.map((f) => Promise.resolve(f)), + }, + asyncListError: { + type: new GraphQLList(friendType), + resolve: () => + friends.map((f, i) => { + if (i === 1) { + return Promise.reject(new Error('bad')); + } + return Promise.resolve(f); + }), + }, + asyncIterableList: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + yield friend; + } + }, + }, + asyncIterableError: { + type: new GraphQLList(friendType), + async *resolve() { + yield friends[0]; + throw new Error('bad'); + }, + }, + asyncIterableInvalid: { + type: new GraphQLList(GraphQLString), + async *resolve() { + yield friends[0].name; + yield {}; + }, + }, + asyncIterableListDelayedClose: { + type: new GraphQLList(friendType), + async *resolve() { + for (const friend of friends) { + yield friend; + } + await new Promise((r) => setTimeout(r, 1)); + }, + }, + }, + name: 'Query', +}); + +async function complete(document) { + const schema = new GraphQLSchema({ query }); + + const result = await execute(schema, document, {}); + + if (isAsyncIterable(result)) { + const results = []; + for await (const patch of result) { + results.push(patch); + } + return results; + } + return result; +} + +describe('Execute: stream directive', () => { + it('Can stream a list field', async () => { + const document = parse('{ scalarList @stream(initialCount: 0) }'); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + scalarList: [], + }, + hasNext: true, + }, + { + data: 'apple', + path: ['scalarList', 0], + hasNext: true, + }, + { + data: 'banana', + path: ['scalarList', 1], + hasNext: true, + }, + { + data: 'coconut', + path: ['scalarList', 2], + hasNext: false, + }, + ]); + }); + it('Returns label from stream directive', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 1, label: "scalar-stream") }', + ); + const result = await complete(document); + + expect(result).to.deep.equal([ + { + data: { + scalarList: ['apple'], + }, + hasNext: true, + }, + { + data: 'banana', + path: ['scalarList', 1], + label: 'scalar-stream', + hasNext: true, + }, + { + data: 'coconut', + path: ['scalarList', 2], + label: 'scalar-stream', + hasNext: false, + }, + ]); + }); + it('Can disable @stream using if argument', async () => { + const document = parse( + '{ scalarList @stream(initialCount: 0, if: false) }', + ); + const result = await complete(document); + + expect(result).to.deep.equal({ + data: { scalarList: ['apple', 'banana', 'coconut'] }, + }); + }); + it('Can stream a field that returns a list of promises', async () => { + const document = parse(` + query { + asyncList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncList', 2], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises before initialCount is reached', async () => { + const document = parse(` + query { + asyncListError @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncListError', 1], + }, + ], + data: { + asyncListError: [ + { + name: 'Luke', + id: '1', + }, + null, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncListError', 2], + hasNext: false, + }, + ]); + }); + it('Handles rejections in a field that returns a list of promises after initialCount is reached', async () => { + const document = parse(` + query { + asyncListError @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncListError: [ + { + name: 'Luke', + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncListError', 1], + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncListError', 1], + }, + ], + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncListError', 2], + hasNext: false, + }, + ]); + }); + it('Can stream a field that returns an async iterable', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + name: 'Luke', + id: '1', + }, + { + name: 'Han', + id: '2', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Leia', + id: '3', + }, + path: ['asyncIterableList', 2], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + it('Handles error thrown in async iterable before initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableError @stream(initialCount: 2) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal({ + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableError', 1], + }, + ], + data: { + asyncIterableError: [ + { + name: 'Luke', + id: '1', + }, + null, + ], + }, + }); + }); + it('Handles error thrown in async iterable after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableError @stream(initialCount: 1) { + name + id + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableError: [ + { + name: 'Luke', + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncIterableError', 1], + errors: [ + { + message: 'bad', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableError', 1], + }, + ], + hasNext: false, + }, + ]); + }); + it('Handles errors thrown by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableInvalid @stream(initialCount: 1) + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableInvalid: ['Luke'], + }, + hasNext: true, + }, + { + data: null, + path: ['asyncIterableInvalid', 1], + errors: [ + { + message: 'String cannot represent value: {}', + locations: [ + { + line: 3, + column: 9, + }, + ], + path: ['asyncIterableInvalid', 1], + }, + ], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + + it('Handles promises returned by completeValue after initialCount is reached', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 1) { + name + asyncName + } + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + name: 'Luke', + asyncName: 'Luke', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Han', + asyncName: 'Han', + }, + path: ['asyncIterableList', 1], + hasNext: true, + }, + { + data: { + name: 'Leia', + asyncName: 'Leia', + }, + path: ['asyncIterableList', 2], + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); + + it('Can @defer fields that are resolved after async iterable is complete', async () => { + const document = parse(` + query { + asyncIterableList @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableList: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Luke', + }, + path: ['asyncIterableList', 0], + label: 'DeferName', + hasNext: true, + }, + { + data: { + id: '2', + }, + path: ['asyncIterableList', 1], + label: 'stream-label', + hasNext: true, + }, + { + data: { + id: '3', + }, + path: ['asyncIterableList', 2], + label: 'stream-label', + hasNext: true, + }, + { + data: { + name: 'Han', + }, + path: ['asyncIterableList', 1], + label: 'DeferName', + hasNext: true, + }, + { + data: { + name: 'Leia', + }, + path: ['asyncIterableList', 2], + label: 'DeferName', + hasNext: false, + }, + ]); + }); + it('Can @defer fields that are resolved before async iterable is complete', async () => { + const document = parse(` + query { + asyncIterableListDelayedClose @stream(initialCount: 1, label:"stream-label") { + ...NameFragment @defer(label: "DeferName") @defer(label: "DeferName") + id + } + } + fragment NameFragment on Friend { + name + } + `); + const result = await complete(document); + expect(result).to.deep.equal([ + { + data: { + asyncIterableListDelayedClose: [ + { + id: '1', + }, + ], + }, + hasNext: true, + }, + { + data: { + name: 'Luke', + }, + path: ['asyncIterableListDelayedClose', 0], + label: 'DeferName', + hasNext: true, + }, + { + data: { + id: '2', + }, + path: ['asyncIterableListDelayedClose', 1], + label: 'stream-label', + hasNext: true, + }, + { + data: { + id: '3', + }, + path: ['asyncIterableListDelayedClose', 2], + label: 'stream-label', + hasNext: true, + }, + { + data: { + name: 'Han', + }, + path: ['asyncIterableListDelayedClose', 1], + label: 'DeferName', + hasNext: true, + }, + { + data: { + name: 'Leia', + }, + path: ['asyncIterableListDelayedClose', 2], + label: 'DeferName', + hasNext: true, + }, + { + hasNext: false, + }, + ]); + }); +}); diff --git a/src/execution/execute.js b/src/execution/execute.js index 4523b1df7c..20ef6c7980 100644 --- a/src/execution/execute.js +++ b/src/execution/execute.js @@ -53,6 +53,7 @@ import { GraphQLIncludeDirective, GraphQLSkipDirective, GraphQLDeferDirective, + GraphQLStreamDirective, } from '../type/directives'; import { isNamedType, @@ -748,6 +749,42 @@ function getDeferValues( }; } +/** + * Returns an object containing the @stream arguments if a field should be + * streamed based on the experimental flag, stream directive present and + * not disabled by the "if" argument. + */ +function getStreamValues( + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, +): void | {| + initialCount?: number, + label?: string, +|} { + // validation only allows equivalent streams on multiple fields, so it is + // safe to only check the first fieldNode for the stream directive + const stream = getDirectiveValues( + GraphQLStreamDirective, + fieldNodes[0], + exeContext.variableValues, + ); + + if (!stream) { + return; + } + + if (stream.if === false) { + return; + } + + return { + initialCount: + // istanbul ignore next (initialCount is required number argument) + typeof stream.initialCount === 'number' ? stream.initialCount : undefined, + label: typeof stream.label === 'string' ? stream.label : undefined, + }; +} + /** * Determines if a fragment is applicable to the given type. */ @@ -1040,6 +1077,7 @@ function completeAsyncIteratorValue( errors: Array, ): Promise<$ReadOnlyArray> { let containsPromise = false; + const stream = getStreamValues(exeContext, fieldNodes); return new Promise((resolve) => { function next(index, completedResults) { const fieldPath = addPath(path, index, undefined); @@ -1076,7 +1114,26 @@ function completeAsyncIteratorValue( return; } - next(index + 1, completedResults); + const newIndex = index + 1; + if ( + stream && + typeof stream.initialCount === 'number' && + newIndex >= stream.initialCount + ) { + exeContext.dispatcher.addAsyncIteratorValue( + stream.label, + newIndex, + path, + iterator, + exeContext, + fieldNodes, + info, + itemType, + ); + resolve(completedResults); + return; + } + next(newIndex, completedResults); }, (rawError) => { completedResults.push(null); @@ -1131,6 +1188,8 @@ function completeListValue( ); } + const stream = getStreamValues(exeContext, fieldNodes); + // This is specified as a simple map, however we're optimizing the path // where the list contains no Promises by avoiding creating another Promise. let containsPromise = false; @@ -1140,6 +1199,23 @@ function completeListValue( const itemPath = addPath(path, index, undefined); try { let completedItem; + + if ( + stream && + typeof stream.initialCount === 'number' && + index >= stream.initialCount + ) { + exeContext.dispatcher.addValue( + stream.label, + itemPath, + item, + exeContext, + fieldNodes, + info, + itemType, + ); + return; + } if (isPromise(item)) { completedItem = item.then((resolved) => completeValue( @@ -1182,7 +1258,7 @@ function completeListValue( const error = locatedError(rawError, fieldNodes, pathToArray(itemPath)); return handleFieldError(error, itemType, errors); } - }); + }).filter((val) => val !== undefined); return containsPromise ? Promise.all(completedResults) : completedResults; } @@ -1595,6 +1671,129 @@ export class Dispatcher { ); } + addValue( + label?: string, + path: Path, + promiseOrData: PromiseOrValue | mixed>, + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + ): void { + const errors = []; + this._subsequentPayloads.push( + Promise.resolve(promiseOrData) + .then((resolved) => + completeValue( + exeContext, + itemType, + fieldNodes, + info, + path, + resolved, + errors, + ), + ) + // Note: we don't rely on a `catch` method, but we do expect "thenable" + // to take a second callback for the error case. + .then(undefined, (rawError) => { + const error = locatedError(rawError, fieldNodes, pathToArray(path)); + return handleFieldError(error, itemType, errors); + }) + .then((data) => ({ + value: createPatchResult(data, label, path, errors), + done: false, + })), + ); + } + + addAsyncIteratorValue( + label?: string, + initialIndex: number, + path?: Path, + iterator: AsyncIterator, + exeContext: ExecutionContext, + fieldNodes: $ReadOnlyArray, + info: GraphQLResolveInfo, + itemType: GraphQLOutputType, + ): void { + const subsequentPayloads = this._subsequentPayloads; + function next(index) { + const fieldPath = addPath(path, index); + const patchErrors = []; + subsequentPayloads.push( + iterator.next().then( + ({ value: data, done }) => { + if (done) { + return { value: undefined, done: true }; + } + + // eslint-disable-next-line node/callback-return + next(index + 1); + + try { + const completedItem = completeValue( + exeContext, + itemType, + fieldNodes, + info, + fieldPath, + data, + patchErrors, + ); + + if (isPromise(completedItem)) { + return completedItem.then((resolveItem) => ({ + value: createPatchResult( + resolveItem, + label, + fieldPath, + patchErrors, + ), + done: false, + })); + } + + return { + value: createPatchResult( + completedItem, + label, + fieldPath, + patchErrors, + ), + done: false, + }; + } catch (rawError) { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, patchErrors); + return { + value: createPatchResult(null, label, fieldPath, patchErrors), + done: false, + }; + } + }, + (rawError) => { + const error = locatedError( + rawError, + fieldNodes, + pathToArray(fieldPath), + ); + handleFieldError(error, itemType, patchErrors); + return { + value: createPatchResult(null, label, fieldPath, patchErrors), + done: false, + }; + }, + ), + ); + } + next(initialIndex); + } + _race(): Promise> { return new Promise((resolve) => { this._subsequentPayloads.forEach((promise) => { @@ -1611,7 +1810,20 @@ export class Dispatcher { ); return promise; }) - .then(({ value }) => { + .then(({ value, done }) => { + if (done && this._subsequentPayloads.length === 0) { + // async iterable resolver just finished and no more pending payloads + return { + value: { + hasNext: false, + }, + done: false, + }; + } else if (done) { + // async iterable resolver just finished but there are pending payloads + // return the next one + return this._race(); + } const returnValue: ExecutionPatchResult = { ...value, hasNext: this._subsequentPayloads.length > 0, diff --git a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js index 080f859b89..2a1982180a 100644 --- a/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js +++ b/src/validation/__tests__/OverlappingFieldsCanBeMergedRule-test.js @@ -98,6 +98,123 @@ describe('Validate: Overlapping fields can be merged', () => { `); }); + it('Same stream directives supported', () => { + expectValid(` + fragment differentDirectivesWithDifferentAliases on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 1) + } + `); + }); + + it('different stream directive label', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "anotherLabel", initialCount: 1) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive initialCount', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream(label: "streamLabel", initialCount: 2) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive first missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name @stream(label: "streamLabel", initialCount: 1) + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive second missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream(label: "streamLabel", initialCount: 1) + name @stream + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('mix of stream and no stream', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + + it('different stream directive both missing args', () => { + expectErrors(` + fragment conflictingArgs on Dog { + name @stream + name @stream + } + `).to.deep.equal([ + { + message: + 'Fields "name" conflict because they have differing stream directives. Use different aliases on the fields to fetch both if this was intentional.', + locations: [ + { line: 3, column: 9 }, + { line: 4, column: 9 }, + ], + }, + ]); + }); + it('Same aliases with different field targets', () => { expectErrors(` fragment sameAliasesWithDifferentFieldTargets on Dog { diff --git a/src/validation/rules/OverlappingFieldsCanBeMergedRule.js b/src/validation/rules/OverlappingFieldsCanBeMergedRule.js index 2d79dd098f..542aceee97 100644 --- a/src/validation/rules/OverlappingFieldsCanBeMergedRule.js +++ b/src/validation/rules/OverlappingFieldsCanBeMergedRule.js @@ -13,6 +13,7 @@ import type { FieldNode, ArgumentNode, FragmentDefinitionNode, + DirectiveNode, } from '../../language/ast'; import { Kind } from '../../language/kinds'; import { print } from '../../language/printer'; @@ -584,6 +585,18 @@ function findConflict( [node2], ]; } + + // istanbul ignore next (See: 'https://github.com/graphql/graphql-js/issues/2203') + const directives1 = node1.directives ?? []; + // istanbul ignore next (See: 'https://github.com/graphql/graphql-js/issues/2203') + const directives2 = node2.directives ?? []; + if (!sameStreams(directives1, directives2)) { + return [ + [responseName, 'they have differing stream directives'], + [node1], + [node2], + ]; + } } // The return type for each field. @@ -642,6 +655,53 @@ function sameArguments( }); } +function sameDirectiveArgument( + directive1: DirectiveNode, + directive2: DirectiveNode, + argumentName: string, +): boolean { + /* istanbul ignore next (See https://github.com/graphql/graphql-js/issues/2203) */ + const args1 = directive1.arguments || []; + const arg1 = find(args1, (argument) => argument.name.value === argumentName); + if (!arg1) { + return false; + } + + /* istanbul ignore next (See https://github.com/graphql/graphql-js/issues/2203) */ + const args2 = directive2.arguments || []; + const arg2 = find(args2, (argument) => argument.name.value === argumentName); + if (!arg2) { + return false; + } + return sameValue(arg1.value, arg2.value); +} + +function getStreamDirective( + directives: $ReadOnlyArray, +): ?DirectiveNode { + return find(directives, (directive) => directive.name.value === 'stream'); +} + +function sameStreams( + directives1: $ReadOnlyArray, + directives2: $ReadOnlyArray, +): boolean { + const stream1 = getStreamDirective(directives1); + const stream2 = getStreamDirective(directives2); + if (!stream1 && !stream2) { + // both fields do not have streams + return true; + } else if (stream1 && stream2) { + // check if both fields have equivalent streams + return ( + sameDirectiveArgument(stream1, stream2, 'initialCount') && + sameDirectiveArgument(stream1, stream2, 'label') + ); + } + // fields have a mix of stream and no stream + return false; +} + function sameValue(value1: ValueNode, value2: ValueNode): boolean { return print(value1) === print(value2); }