Skip to content

Commit

Permalink
the IncrementalPublisher should handle response building
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jul 6, 2023
1 parent fae5da5 commit bcb8ac2
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 221 deletions.
275 changes: 183 additions & 92 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,72 @@
import { isPromise } from '../jsutils/isPromise.js';
import type { ObjMap } from '../jsutils/ObjMap.js';
import type { Path } from '../jsutils/Path.js';
import { pathToArray } from '../jsutils/Path.js';
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

import type {
GraphQLError,
GraphQLFormattedError,
} from '../error/GraphQLError.js';

/**
* The result of GraphQL execution.
*
* - `errors` is included when any errors occurred as a non-empty array.
* - `data` is the result of a successful execution of the query.
* - `hasNext` is true if a future payload is expected.
* - `extensions` is reserved for adding non-standard properties.
* - `incremental` is a list of the results from defer/stream directives.
*/
export interface ExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLError>;
data?: TData | null;
extensions?: TExtensions;
}

export interface FormattedExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
errors?: ReadonlyArray<GraphQLFormattedError>;
data?: TData | null;
extensions?: TExtensions;
}

export interface ExperimentalIncrementalExecutionResults<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> {
initialResult: InitialIncrementalExecutionResult<TData, TExtensions>;
subsequentResults: AsyncGenerator<
SubsequentIncrementalExecutionResult<TData, TExtensions>,
void,
void
>;
}

export interface InitialIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> extends ExecutionResult<TData, TExtensions> {
hasNext: boolean;
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
extensions?: TExtensions;
}

export interface FormattedInitialIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
> extends FormattedExecutionResult<TData, TExtensions> {
hasNext: boolean;
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
extensions?: TExtensions;
}

export interface SubsequentIncrementalExecutionResult<
TData = ObjMap<unknown>,
TExtensions = ObjMap<unknown>,
Expand Down Expand Up @@ -113,86 +172,6 @@ export class IncrementalPublisher {
this._reset();
}

hasNext(): boolean {
return this._pending.size > 0;
}

subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
let isDone = false;

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
// eslint-disable-next-line no-constant-condition
while (true) {
if (isDone) {
return { value: undefined, done: true };
}

for (const item of this._released) {
this._pending.delete(item);
}
const released = this._released;
this._released = new Set();

const result = this._getIncrementalResult(released);

if (!this.hasNext()) {
isDone = true;
}

if (result !== undefined) {
return { value: result, done: false };
}

// eslint-disable-next-line no-await-in-loop
await this._signalled;
}
};

const returnStreamIterators = async (): Promise<void> => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this._pending.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
await Promise.all(promises);
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
await returnStreamIterators();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
await returnStreamIterators();
return Promise.reject(error);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next: _next,
return: _return,
throw: _throw,
};
}

prepareInitialResultRecord(): InitialResultRecord {
return {
errors: [],
Expand Down Expand Up @@ -256,19 +235,26 @@ export class IncrementalPublisher {
incrementalDataRecord.errors.push(error);
}

publishInitial(initialResult: InitialResultRecord) {
for (const child of initialResult.children) {
if (child.filtered) {
continue;
}
this._publish(child);
handleInitialResultData(
initialResultRecord: InitialResultRecord,
data: PromiseOrValue<ObjMap<unknown>>,
): PromiseOrValue<ExecutionResult | ExperimentalIncrementalExecutionResults> {
if (isPromise(data)) {
return data.then(
(resolved) => this._buildInitialResponse(initialResultRecord, resolved),
(error) => this.handleInitialResultError(initialResultRecord, error),
);
}
return this._buildInitialResponse(initialResultRecord, data);
}

getInitialErrors(
initialResult: InitialResultRecord,
): ReadonlyArray<GraphQLError> {
return initialResult.errors;
handleInitialResultError(
initialResultRecord: InitialResultRecord,
error: GraphQLError,
): ExecutionResult {
const errors = initialResultRecord.errors;
errors.push(error);
return { data: null, errors };
}

filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
Expand Down Expand Up @@ -301,6 +287,111 @@ export class IncrementalPublisher {
});
}

private _buildInitialResponse(
initialResultRecord: InitialResultRecord,
data: ObjMap<unknown> | null,
): ExecutionResult | ExperimentalIncrementalExecutionResults {
for (const child of initialResultRecord.children) {
if (child.filtered) {
continue;
}
this._publish(child);
}

const errors = initialResultRecord.errors;
const initialResult = errors.length === 0 ? { data } : { errors, data };
if (this._hasNext()) {
return {
initialResult: {
...initialResult,
hasNext: true,
},
subsequentResults: this._subscribe(),
};
}
return initialResult;
}

private _hasNext(): boolean {
return this._pending.size > 0;
}

private _subscribe(): AsyncGenerator<
SubsequentIncrementalExecutionResult,
void,
void
> {
let isDone = false;

const _next = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
// eslint-disable-next-line no-constant-condition
while (true) {
if (isDone) {
return { value: undefined, done: true };
}

for (const item of this._released) {
this._pending.delete(item);
}
const released = this._released;
this._released = new Set();

const result = this._getIncrementalResult(released);

if (!this._hasNext()) {
isDone = true;
}

if (result !== undefined) {
return { value: result, done: false };
}

// eslint-disable-next-line no-await-in-loop
await this._signalled;
}
};

const returnStreamIterators = async (): Promise<void> => {
const promises: Array<Promise<IteratorResult<unknown>>> = [];
this._pending.forEach((incrementalDataRecord) => {
if (
isStreamItemsRecord(incrementalDataRecord) &&
incrementalDataRecord.asyncIterator?.return
) {
promises.push(incrementalDataRecord.asyncIterator.return());
}
});
await Promise.all(promises);
};

const _return = async (): Promise<
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
isDone = true;
await returnStreamIterators();
return { value: undefined, done: true };
};

const _throw = async (
error?: unknown,
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
isDone = true;
await returnStreamIterators();
return Promise.reject(error);
};

return {
[Symbol.asyncIterator]() {
return this;
},
next: _next,
return: _return,
throw: _throw,
};
}

private _trigger() {
this._resolve();
this._reset();
Expand Down Expand Up @@ -369,8 +460,8 @@ export class IncrementalPublisher {
}

return incrementalResults.length
? { incremental: incrementalResults, hasNext: this.hasNext() }
: encounteredCompletedAsyncIterator && !this.hasNext()
? { incremental: incrementalResults, hasNext: this._hasNext() }
: encounteredCompletedAsyncIterator && !this._hasNext()
? { hasNext: false }
: undefined;
}
Expand Down
6 changes: 4 additions & 2 deletions src/execution/__tests__/defer-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ import {
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { InitialIncrementalExecutionResult } from '../execute.js';
import { execute, experimentalExecuteIncrementally } from '../execute.js';
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
import type {
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
} from '../IncrementalPublisher.js';

const friendType = new GraphQLObjectType({
fields: {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/lists-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { GraphQLSchema } from '../../type/schema.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute, executeSync } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

describe('Execute: Accepts any iterable as list value', () => {
function complete(rootValue: unknown) {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/nonnull-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import { GraphQLSchema } from '../../type/schema.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute, executeSync } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

const syncError = new Error('sync');
const syncNonNullError = new Error('syncNonNull');
Expand Down
2 changes: 1 addition & 1 deletion src/execution/__tests__/oneof-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import { parse } from '../../language/parser.js';

import { buildSchema } from '../../utilities/buildASTSchema.js';

import type { ExecutionResult } from '../execute.js';
import { execute } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

const schema = buildSchema(`
type Query {
Expand Down
6 changes: 4 additions & 2 deletions src/execution/__tests__/stream-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import {
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { InitialIncrementalExecutionResult } from '../execute.js';
import { experimentalExecuteIncrementally } from '../execute.js';
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
import type {
InitialIncrementalExecutionResult,
SubsequentIncrementalExecutionResult,
} from '../IncrementalPublisher.js';

const friendType = new GraphQLObjectType({
fields: {
Expand Down
3 changes: 2 additions & 1 deletion src/execution/__tests__/subscribe-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import {
} from '../../type/scalars.js';
import { GraphQLSchema } from '../../type/schema.js';

import type { ExecutionArgs, ExecutionResult } from '../execute.js';
import type { ExecutionArgs } from '../execute.js';
import { createSourceEventStream, subscribe } from '../execute.js';
import type { ExecutionResult } from '../IncrementalPublisher.js';

import { SimplePubSub } from './simplePubSub.js';

Expand Down
Loading

0 comments on commit bcb8ac2

Please sign in to comment.