Skip to content

Commit

Permalink
add defer/stream support for subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
robrichard committed Oct 22, 2020
1 parent d5f0ca5 commit cd97ca7
Show file tree
Hide file tree
Showing 4 changed files with 346 additions and 14 deletions.
141 changes: 141 additions & 0 deletions src/subscription/__tests__/flattenAsyncIterator-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import { expect } from 'chai';
import { describe, it } from 'mocha';

import flattenAsyncIterator from '../flattenAsyncIterator';

describe('flattenAsyncIterator', () => {
it('does not modify an already flat async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield 3;
}

const result = flattenAsyncIterator(source());

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('does not modify an already flat async iterator', async () => {
const items = [1, 2, 3];

const iterator: any = {
[Symbol.asyncIterator]() {
return this;
},
next() {
return Promise.resolve({
done: items.length === 0,
value: items.shift(),
});
},
};

const result = flattenAsyncIterator(iterator);

expect(await result.next()).to.deep.equal({ value: 1, done: false });
expect(await result.next()).to.deep.equal({ value: 2, done: false });
expect(await result.next()).to.deep.equal({ value: 3, done: false });
expect(await result.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('flatten nested async generators', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* () {
yield 2.1;
yield 2.2;
yield (async function* () {
yield 2.21;
yield 2.22;
yield 2.23;
})();
yield 2.3;
})();
yield 3;
}

const doubles = flattenAsyncIterator(source());

const result = [];
for await (const x of doubles) {
result.push(x);
}
expect(result).to.deep.equal([1, 2, 2.1, 2.2, 2.21, 2.22, 2.23, 2.3, 3]);
});

it('allows returning early from a nested async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* () {
yield 2.1;
// istanbul ignore next (Shouldn't be reached)
yield 2.2;
})();
// istanbul ignore next (Shouldn't be reached)
yield 3;
}

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Early return
expect(await doubles.return()).to.deep.equal({
value: undefined,
done: true,
});

// Subsequent next calls
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
expect(await doubles.next()).to.deep.equal({
value: undefined,
done: true,
});
});

it('allows throwing errors from a nested async generator', async () => {
async function* source() {
yield 1;
yield 2;
yield (async function* () {
yield 2.1;
// istanbul ignore next (Shouldn't be reached)
yield 2.2;
})();
// istanbul ignore next (Shouldn't be reached)
yield 3;
}

const doubles = flattenAsyncIterator(source());

expect(await doubles.next()).to.deep.equal({ value: 1, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
expect(await doubles.next()).to.deep.equal({ value: 2.1, done: false });

// Throw error
let caughtError;
try {
await doubles.throw('ouch');
} catch (e) {
caughtError = e;
}
expect(caughtError).to.equal('ouch');
});
});
147 changes: 147 additions & 0 deletions src/subscription/__tests__/subscribe-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,153 @@ describe('Subscription Publish Phase', () => {
});
});

it('produces additional payloads for subscriptions with @defer', async () => {
const pubsub = new SimplePubSub();
const subscription = await createSubscription(
pubsub,
emailSchema,
parse(`
subscription ($priority: Int = 0) {
importantEmail(priority: $priority) {
email {
from
subject
}
... @defer {
inbox {
unread
total
}
}
}
}
`),
);
invariant(isAsyncIterable(subscription));
// Wait for the next subscription payload.
const payload = subscription.next();

// A new email arrives!
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Alright',
message: 'Tests are good',
unread: true,
}),
).to.equal(true);

// The previously waited on payload now has a value.
expect(await payload).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Alright',
},
},
},
hasNext: true,
},
});

// Wait for the next payload from @defer
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
inbox: {
unread: 1,
total: 2,
},
},
path: ['importantEmail'],
hasNext: false,
},
});

// Another new email arrives, after all incrementally delivered payloads are received.
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Tools',
message: 'I <3 making things',
unread: true,
}),
).to.equal(true);

// The next waited on payload will have a value.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Tools',
},
},
},
hasNext: true,
},
});

// Another new email arrives, before the incrementally delivered payloads from the last email was received.
expect(
pubsub.emit({
from: '[email protected]',
subject: 'Important',
message: 'Read me please',
unread: true,
}),
).to.equal(true);

// Deferred payload from previous event is received.
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
inbox: {
unread: 2,
total: 3,
},
},
path: ['importantEmail'],
hasNext: false,
},
});

// Next payload from last event
expect(await subscription.next()).to.deep.equal({
done: false,
value: {
data: {
importantEmail: {
email: {
from: '[email protected]',
subject: 'Important',
},
},
},
hasNext: true,
},
});

// The client disconnects before the deferred payload is consumed.
expect(await subscription.return()).to.deep.equal({
done: true,
value: undefined,
});

// Awaiting a subscription after closing it results in completed results.
expect(await subscription.next()).to.deep.equal({
done: true,
value: undefined,
});
});

it('produces a payload when there are multiple events', async () => {
const pubsub = new SimplePubSub();
const subscription = await createSubscription(pubsub);
Expand Down
49 changes: 49 additions & 0 deletions src/subscription/flattenAsyncIterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { SYMBOL_ASYNC_ITERATOR } from '../polyfills/symbols';

import isAsyncIterable from '../jsutils/isAsyncIterable';

/**
* Given an AsyncIterable and a callback function, return an AsyncIterator
* which produces values mapped via calling the callback function.
*/
export default function flattenAsyncIterator<T>(
iterable: AsyncGenerator<AsyncGenerator<T, void, void> | T, void, void>,
): AsyncGenerator<T, void, void> {
// $FlowFixMe[prop-missing]
const iteratorMethod = iterable[SYMBOL_ASYNC_ITERATOR];
const iterator: any = iteratorMethod.call(iterable);
let iteratorStack: Array<AsyncGenerator<T, void, void>> = [iterator];

function next(): Promise<IteratorResult<T, void>> {
const currentIterator = iteratorStack[0];
if (!currentIterator) {
return Promise.resolve({ value: undefined, done: true });
}
return currentIterator.next().then((result) => {
if (result.done) {
iteratorStack.shift();
return next();
} else if (isAsyncIterable(result.value)) {
const childIteratorMethod = result.value[SYMBOL_ASYNC_ITERATOR];
const childIterator: any = childIteratorMethod.call(result.value);
iteratorStack.unshift(childIterator);
return next();
}
return result;
});
}
return ({
next,
return() {
iteratorStack = [];
return iterator.return();
},
throw(error?: mixed): Promise<IteratorResult<T, void>> {
iteratorStack = [];
return iterator.throw(error);
},
[SYMBOL_ASYNC_ITERATOR]() {
return this;
},
}: $FlowFixMe);
}
23 changes: 9 additions & 14 deletions src/subscription/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type { GraphQLFieldResolver } from '../type/definition';
import { getOperationRootType } from '../utilities/getOperationRootType';

import mapAsyncIterator from './mapAsyncIterator';
import flattenAsyncIterator from './flattenAsyncIterator';

export type SubscriptionArgs = {|
schema: GraphQLSchema,
Expand Down Expand Up @@ -140,8 +141,8 @@ function subscribeImpl(
// the GraphQL specification. The `execute` function provides the
// "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
// "ExecuteQuery" algorithm, for which `execute` is also used.
const mapSourceToResponse = (payload) => {
const executionResult = execute({
const mapSourceToResponse = (payload) =>
execute({
schema,
document,
rootValue: payload,
Expand All @@ -150,24 +151,18 @@ function subscribeImpl(
operationName,
fieldResolver,
});
/* istanbul ignore if - TODO: implement support for defer/stream in subscriptions */
if (isAsyncIterable(executionResult)) {
throw new Error(
'TODO: implement support for defer/stream in subscriptions',
);
}
return executionResult;
};

// Resolve the Source Stream, then map every source value to a
// ExecutionResult value as described above.
return sourcePromise.then((resultOrStream) =>
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
isAsyncIterable(resultOrStream)
? mapAsyncIterator(
resultOrStream,
mapSourceToResponse,
reportGraphQLError,
? flattenAsyncIterator(
mapAsyncIterator(
resultOrStream,
mapSourceToResponse,
reportGraphQLError,
),
)
: ((resultOrStream: any): ExecutionResult),
);
Expand Down

0 comments on commit cd97ca7

Please sign in to comment.