Skip to content

Commit

Permalink
add pending notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Jul 4, 2023
1 parent 80a8689 commit bcf304f
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 4 deletions.
64 changes: 60 additions & 4 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type {
import type { StreamUsage } from './execute.js';

interface IncrementalUpdate<TData = unknown, TExtensions = ObjMap<unknown>> {
pending: ReadonlyArray<PendingResult>;
incremental: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
completed: ReadonlyArray<CompletedResult>;
}
Expand Down Expand Up @@ -67,6 +68,7 @@ export interface InitialIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> extends ExecutionResult<TData, TExtensions> {
data: TData;
pending: ReadonlyArray<PendingResult>;
hasNext: true;
extensions?: TExtensions;
}
Expand All @@ -76,6 +78,7 @@ export interface FormattedInitialIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> extends FormattedExecutionResult<TData, TExtensions> {
data: TData;
pending: ReadonlyArray<PendingResult>;
hasNext: boolean;
extensions?: TExtensions;
}
Expand All @@ -93,6 +96,7 @@ export interface FormattedSubsequentIncrementalExecutionResult<
TExtensions = ObjMap<unknown>,
> {
hasNext: boolean;
pending?: ReadonlyArray<PendingResult>;
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
completed?: ReadonlyArray<FormattedCompletedResult>;
extensions?: TExtensions;
Expand Down Expand Up @@ -149,6 +153,11 @@ export type FormattedIncrementalResult<
| FormattedIncrementalDeferResult<TData, TExtensions>
| FormattedIncrementalStreamResult<TData, TExtensions>;

export interface PendingResult {
path: ReadonlyArray<string | number>;
label?: string;
}

export interface CompletedResult {
path: ReadonlyArray<string | number>;
label?: string;
Expand Down Expand Up @@ -417,6 +426,23 @@ export class IncrementalPublisher {
});
}

pendingSourcesToResults(
pendingSources: ReadonlySet<DeferredFragmentRecord | StreamRecord>,
): Array<PendingResult> {
const pendingResults: Array<PendingResult> = [];
for (const pendingSource of pendingSources) {
pendingSource.pendingSent = true;
const pendingResult: PendingResult = {
path: pendingSource.path,
};
if (pendingSource.label !== undefined) {
pendingResult.label = pendingSource.label;
}
pendingResults.push(pendingResult);
}
return pendingResults;
}

private _buildInitialResponse(
initialResultRecord: InitialResultRecord,
data: ObjMap<unknown> | null,
Expand All @@ -430,10 +456,20 @@ export class IncrementalPublisher {

const errors = initialResultRecord.errors;
const initialResult = errors.length === 0 ? { data } : { errors, data };
if (this._hasNext()) {
const pending = this._getPending();
if (pending.size > 0) {
const pendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
for (const subsequentResultRecord of pending) {
const pendingSource = isStreamItemsRecord(subsequentResultRecord)
? subsequentResultRecord.streamRecord
: subsequentResultRecord;
pendingSources.add(pendingSource);
}

return {
initialResult: {
...initialResult,
pending: this.pendingSourcesToResults(pendingSources),
hasNext: true,
},
subsequentResults: this._subscribe(),
Expand All @@ -442,6 +478,10 @@ export class IncrementalPublisher {
return initialResult;
}

private _getPending(): ReadonlySet<SubsequentResultRecord> {
return this._pending;
}

private _hasNext(): boolean {
return this._pending.size > 0;
}
Expand Down Expand Up @@ -566,14 +606,18 @@ export class IncrementalPublisher {
private _getIncrementalResult(
completedRecords: ReadonlySet<SubsequentResultRecord>,
): SubsequentIncrementalExecutionResult | undefined {
const { incremental, completed } = this._processPending(completedRecords);
const { pending, incremental, completed } =
this._processPending(completedRecords);

const hasNext = this._hasNext();
if (incremental.length === 0 && completed.length === 0 && hasNext) {
return undefined;
}

const result: SubsequentIncrementalExecutionResult = { hasNext };
if (pending.length) {
result.pending = pending;
}
if (incremental.length) {
result.incremental = incremental;
}
Expand All @@ -587,19 +631,27 @@ export class IncrementalPublisher {
private _processPending(
completedRecords: ReadonlySet<SubsequentResultRecord>,
): IncrementalUpdate {
const newPendingSources = new Set<DeferredFragmentRecord | StreamRecord>();
const incrementalResults: Array<IncrementalResult> = [];
const completedResults: Array<CompletedResult> = [];
for (const subsequentResultRecord of completedRecords) {
for (const child of subsequentResultRecord.children) {
if (child.filtered) {
continue;
}
const pendingSource = isStreamItemsRecord(child)
? child.streamRecord
: child;
if (!pendingSource.pendingSent) {
newPendingSources.add(pendingSource);
}
this._publish(child);
}
if (isStreamItemsRecord(subsequentResultRecord)) {
if (!subsequentResultRecord.sent) {
subsequentResultRecord.sent = true;
if (subsequentResultRecord.isFinalRecord) {
newPendingSources.delete(subsequentResultRecord.streamRecord);
completedResults.push(
this._completedRecordToResult(
subsequentResultRecord.streamRecord,
Expand All @@ -623,6 +675,7 @@ export class IncrementalPublisher {
incrementalResults.push(incrementalResult);
}
} else {
newPendingSources.delete(subsequentResultRecord);
completedResults.push(
this._completedRecordToResult(subsequentResultRecord),
);
Expand All @@ -647,6 +700,7 @@ export class IncrementalPublisher {
}

return {
pending: this.pendingSourcesToResults(newPendingSources),
incremental: incrementalResults,
completed: completedResults,
};
Expand Down Expand Up @@ -805,6 +859,7 @@ export class DeferredFragmentRecord {
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
errors: Array<GraphQLError>;
filtered: boolean;
pendingSent?: boolean;
_pending: Set<DeferredGroupedFieldSetRecord>;

constructor(opts: { path: Path | undefined; label: string | undefined }) {
Expand All @@ -824,6 +879,7 @@ export class StreamRecord {
path: ReadonlyArray<string | number>;
errors: Array<GraphQLError>;
asyncIterator?: AsyncIterator<unknown> | undefined;
pendingSent?: boolean;
constructor(opts: {
label: string | undefined;
path: Path;
Expand Down Expand Up @@ -867,15 +923,15 @@ export type IncrementalDataRecord =
| DeferredGroupedFieldSetRecord
| StreamItemsRecord;

type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;
export type SubsequentResultRecord = DeferredFragmentRecord | StreamItemsRecord;

function isDeferredGroupedFieldSetRecord(
incrementalDataRecord: unknown,
): incrementalDataRecord is DeferredGroupedFieldSetRecord {
return incrementalDataRecord instanceof DeferredGroupedFieldSetRecord;
}

function isStreamItemsRecord(
export function isStreamItemsRecord(
subsequentResultRecord: unknown,
): subsequentResultRecord is StreamItemsRecord {
return subsequentResultRecord instanceof StreamItemsRecord;
Expand Down
Loading

0 comments on commit bcf304f

Please sign in to comment.