|
| 1 | +import { DocumentNode, OperationTypeNode } from 'graphql' |
| 2 | +import type { |
| 3 | + GraphQLHandlerNameSelector, |
| 4 | + GraphQLQuery, |
| 5 | + GraphQLVariables, |
| 6 | +} from './GraphQLHandler' |
| 7 | +import type { Path } from '../utils/matching/matchRequestUrl' |
| 8 | +import { parseDocumentNode } from '../utils/internal/parseGraphQLRequest' |
| 9 | +import { WebSocketLink, ws } from '../ws' |
| 10 | +import { WebSocketHandler } from './WebSocketHandler' |
| 11 | +import { jsonParse } from '../utils/internal/jsonParse' |
| 12 | +import type { TypedDocumentNode } from '../graphql' |
| 13 | + |
| 14 | +export interface GraphQLPubsub { |
| 15 | + /** |
| 16 | + * A WebSocket handler to intercept GraphQL subscription events. |
| 17 | + */ |
| 18 | + handler: WebSocketHandler |
| 19 | + |
| 20 | + /** |
| 21 | + * Publishes the given payload to all GraphQL subscriptions. |
| 22 | + */ |
| 23 | + publish: (payload: { data?: Record<string, unknown> }) => void |
| 24 | +} |
| 25 | + |
| 26 | +export class GraphQLInternalPubsub { |
| 27 | + public pubsub: GraphQLPubsub |
| 28 | + public webSocketLink: WebSocketLink |
| 29 | + private subscriptions: Set<string> |
| 30 | + |
| 31 | + constructor(public readonly url: Path) { |
| 32 | + this.subscriptions = new Set() |
| 33 | + |
| 34 | + /** |
| 35 | + * @fixme This isn't nice. |
| 36 | + * This is here to translate HTTP URLs from `graphql.link` to a WS URL. |
| 37 | + * Works for strings but won't work for RegExp. |
| 38 | + */ |
| 39 | + const webSocketUrl = |
| 40 | + typeof url === 'string' ? url.replace(/^http/, 'ws') : url |
| 41 | + |
| 42 | + /** |
| 43 | + * @todo Support `log: false` not to print logs from the underlying WS handler. |
| 44 | + */ |
| 45 | + this.webSocketLink = ws.link(webSocketUrl) |
| 46 | + |
| 47 | + const webSocketHandler = this.webSocketLink.addEventListener( |
| 48 | + 'connection', |
| 49 | + ({ client }) => { |
| 50 | + client.addEventListener('message', (event) => { |
| 51 | + if (typeof event.data !== 'string') { |
| 52 | + return |
| 53 | + } |
| 54 | + |
| 55 | + const message = jsonParse(event.data) |
| 56 | + |
| 57 | + if (!message) { |
| 58 | + return |
| 59 | + } |
| 60 | + |
| 61 | + switch (message.type) { |
| 62 | + case 'connection_init': { |
| 63 | + client.send(JSON.stringify({ type: 'connection_ack' })) |
| 64 | + break |
| 65 | + } |
| 66 | + |
| 67 | + case 'subscribe': { |
| 68 | + this.subscriptions.add(message.id) |
| 69 | + break |
| 70 | + } |
| 71 | + |
| 72 | + case 'complete': { |
| 73 | + this.subscriptions.delete(message.id) |
| 74 | + break |
| 75 | + } |
| 76 | + } |
| 77 | + }) |
| 78 | + }, |
| 79 | + ) |
| 80 | + |
| 81 | + this.pubsub = { |
| 82 | + handler: webSocketHandler, |
| 83 | + publish: (payload) => { |
| 84 | + for (const subscriptionId of this.subscriptions) { |
| 85 | + this.webSocketLink.broadcast( |
| 86 | + this.createSubscriptionMessage({ |
| 87 | + id: subscriptionId, |
| 88 | + payload, |
| 89 | + }), |
| 90 | + ) |
| 91 | + } |
| 92 | + }, |
| 93 | + } |
| 94 | + } |
| 95 | + |
| 96 | + private createSubscriptionMessage(args: { id: string; payload: unknown }) { |
| 97 | + return JSON.stringify({ |
| 98 | + id: args.id, |
| 99 | + type: 'next', |
| 100 | + payload: args.payload, |
| 101 | + }) |
| 102 | + } |
| 103 | +} |
| 104 | + |
| 105 | +export type GraphQLSubscriptionHandler = < |
| 106 | + Query extends GraphQLQuery = GraphQLQuery, |
| 107 | + Variables extends GraphQLVariables = GraphQLVariables, |
| 108 | +>( |
| 109 | + operationName: |
| 110 | + | GraphQLHandlerNameSelector |
| 111 | + | DocumentNode |
| 112 | + | TypedDocumentNode<Query, Variables>, |
| 113 | + resolver: (info: GraphQLSubscriptionHandlerInfo<Variables>) => void, |
| 114 | +) => WebSocketHandler |
| 115 | + |
| 116 | +export interface GraphQLSubscriptionHandlerInfo< |
| 117 | + Variables extends GraphQLVariables, |
| 118 | +> { |
| 119 | + operationName: string |
| 120 | + query: string |
| 121 | + variables: Variables |
| 122 | +} |
| 123 | + |
| 124 | +export function createGraphQLSubscriptionHandler( |
| 125 | + webSocketLink: WebSocketLink, |
| 126 | +): GraphQLSubscriptionHandler { |
| 127 | + return (operationName, resolver) => { |
| 128 | + const webSocketHandler = webSocketLink.addEventListener( |
| 129 | + 'connection', |
| 130 | + ({ client }) => { |
| 131 | + client.addEventListener('message', async (event) => { |
| 132 | + if (typeof event.data !== 'string') { |
| 133 | + return |
| 134 | + } |
| 135 | + |
| 136 | + const message = jsonParse(event.data) |
| 137 | + |
| 138 | + if ( |
| 139 | + message != null && |
| 140 | + 'type' in message && |
| 141 | + message.type === 'subscribe' |
| 142 | + ) { |
| 143 | + const { parse } = await import('graphql') |
| 144 | + const document = parse(message.payload.query) |
| 145 | + const node = parseDocumentNode(document) |
| 146 | + |
| 147 | + if ( |
| 148 | + node.operationType === OperationTypeNode.SUBSCRIPTION && |
| 149 | + node.operationName === operationName |
| 150 | + ) { |
| 151 | + /** |
| 152 | + * @todo Add the path parameters from the pubsub URL. |
| 153 | + */ |
| 154 | + resolver({ |
| 155 | + operationName: node.operationName, |
| 156 | + query: message.payload.query, |
| 157 | + variables: message.payload.variables, |
| 158 | + }) |
| 159 | + } |
| 160 | + } |
| 161 | + }) |
| 162 | + }, |
| 163 | + ) |
| 164 | + |
| 165 | + return webSocketHandler |
| 166 | + } |
| 167 | +} |
0 commit comments