Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add defer/stream support for subscriptions #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/subscription/__tests__/flattenAsyncIterator-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import flattenAsyncIterator from '../flattenAsyncIterator';

describe('flattenAsyncIterator', () => {
it('does not modify an already flat async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield 3;
}

const result = flattenAsyncIterator(source());

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('does not modify an already flat async iterator', async () => {
const items = [1, 2, 3];

const iterator: any = {
[Symbol.asyncIterator]() {
return this;
},
next() {
return Promise.resolve({
done: items.length === 0,
value: items.shift(),
});
},
};

const result = flattenAsyncIterator(iterator);

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('flatten nested async generators', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* (): AsyncGenerator<number, void, void> {
yield 2.1;
yield 2.2;
})();
yield 3;
}

const doubles = flattenAsyncIterator(source());

const result = [];
for await (const x of doubles) {
result.push(x);
}
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 3]);
});

it('allows returning early from a nested async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* (): AsyncGenerator<number, void, void> {
yield 2.1;
// istanbul ignore next (Shouldn't be reached)
yield 2.2;
})();
// istanbul ignore next (Shouldn't be reached)
yield 3;
}

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Early return
expect(await doubles.return()).to.deep.equal({
value: undefined,
done: true,
});

// Subsequent next calls
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('allows throwing errors from a nested async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* (): AsyncGenerator<number, void, void> {
yield 2.1;
// istanbul ignore next (Shouldn't be reached)
yield 2.2;
})();
// istanbul ignore next (Shouldn't be reached)
yield 3;
}

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Throw error
let caughtError;
try {
await doubles.throw('ouch');
} catch (e) {
caughtError = e;
}
expect(caughtError).to.equal('ouch');
});
});
147 changes: 147 additions & 0 deletions src/subscription/__tests__/subscribe-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,153 @@ describe('Subscription Publish Phase', () => {
});
});

it('produces additional payloads for subscriptions with @defer', async () => {
const pubsub = new SimplePubSub();
const subscription = await createSubscription(
pubsub,
emailSchema,
parse(`
subscription ($priority: Int = 0) {
importantEmail(priority: $priority) {
email {
from
subject
}
... @defer {
inbox {
unread
total
}
}
}
}
`),
);
invariant(isAsyncIterable(subscription));
// Wait for the next subscription payload.
const payload = subscription.next();

// A new email arrives!
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Alright',
message: 'Tests are good',
unread: true,
}),
).to.equal(true);

// The previously waited on payload now has a value.
expect(await payload).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Alright',
},
},
},
hasNext: true,
},
});

// Wait for the next payload from @defer
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
inbox: {
unread: 1,
total: 2,
},
},
path: ['importantEmail'],
hasNext: false,
},
});

// Another new email arrives, after all incrementally delivered payloads are received.
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Tools',
message: 'I <3 making things',
unread: true,
}),
).to.equal(true);

// The next waited on payload will have a value.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Tools',
},
},
},
hasNext: true,
},
});

// Another new email arrives, before the incrementally delivered payloads from the last email was received.
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Important',
message: 'Read me please',
unread: true,
}),
).to.equal(true);

// Deferred payload from previous event is received.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
inbox: {
unread: 2,
total: 3,
},
},
path: ['importantEmail'],
hasNext: false,
},
});

// Next payload from last event
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Important',
},
},
},
hasNext: true,
},
});

// The client disconnects before the deferred payload is consumed.
expect(await subscription.return()).to.deep.equal({
done: true,
value: undefined,
});

// Awaiting a subscription after closing it results in completed results.
expect(await subscription.next()).to.deep.equal({
done: true,
value: undefined,
});
});

it('produces a payload when there are multiple events', async () => {
const pubsub = new SimplePubSub();
const subscription = await createSubscription(pubsub);
Expand Down
49 changes: 49 additions & 0 deletions src/subscription/flattenAsyncIterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols';

import isAsyncIterable from '../jsutils/isAsyncIterable';

/**
* Given an AsyncIterable that could potentially yield other async iterators,
* flatten all yielded results into a single AsyncIterable
*/
export default function flattenAsyncIterator<T>(
iterable: AsyncGenerator<AsyncGenerator<T, void, void> | T, void, void>,
): AsyncGenerator<T, void, void> {
// $FlowFixMe[prop-missing]
const iteratorMethod = iterable[SYMBOL_ASYNC_ITERATOR];
const iterator: any = iteratorMethod.call(iterable);
let iteratorStack: Array<AsyncGenerator<T, void, void>> = [iterator];

function next(): Promise<IteratorResult<T, void>> {
const currentIterator = iteratorStack[0];
if (!currentIterator) {
return Promise.resolve({ value: undefined, done: true });
}
return currentIterator.next().then((result) => {
if (result.done) {
iteratorStack.shift();
return next();
} else if (isAsyncIterable(result.value)) {
const childIteratorMethod = result.value[SYMBOL_ASYNC_ITERATOR];
const childIterator: any = childIteratorMethod.call(result.value);
iteratorStack.unshift(childIterator);
return next();
}
return result;
});
}
return ({
next,
return() {
iteratorStack = [];
return iterator.return();
},
throw(error?: mixed): Promise<IteratorResult<T, void>> {
iteratorStack = [];
return iterator.throw(error);
},
[SYMBOL_ASYNC_ITERATOR]() {
return this;
},
}: $FlowFixMe);
}
23 changes: 9 additions & 14 deletions src/subscription/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition';
import { getOperationRootType } from '../utilities/getOperationRootType';

import mapAsyncIterator from './mapAsyncIterator';
import flattenAsyncIterator from './flattenAsyncIterator';

export type SubscriptionArgs = {|
schema: GraphQLSchema,
Expand Down Expand Up @@ -140,8 +141,8 @@ function subscribeImpl(
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload) => {
const executionResult = execute({
const mapSourceToResponse = (payload) =>
execute({
schema,
document,
rootValue: payload,
Expand All @@ -150,24 +151,18 @@ function subscribeImpl(
operationName,
fieldResolver,
});
/* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */
if (isAsyncIterable(executionResult)) {
throw new Error(
'TODO: implement support for defer/stream in subscriptions',
);
}
return executionResult;
};

// Resolve the Source Stream, then map every source value to a
// ExecutionResult value as described above.
return sourcePromise.then((resultOrStream) =>
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
isAsyncIterable(resultOrStream)
? mapAsyncIterator(
resultOrStream,
mapSourceToResponse,
reportGraphQLError,
? flattenAsyncIterator(
mapAsyncIterator(
resultOrStream,
mapSourceToResponse,
reportGraphQLError,
),
)
: ((resultOrStream: any): ExecutionResult),
);
Expand Down