Skip to content

Commit f25bb73

Browse files
committed
feat: add basic graphql-transport-ws protocol handler
1 parent eeb9e76 commit f25bb73

File tree

1 file changed

+204
-0
lines changed

1 file changed

+204
-0
lines changed

handler.ts

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
import {
2+
DocumentNode,
3+
execute,
4+
ExecutionArgs,
5+
ExecutionResult,
6+
getOperationAST,
7+
GraphQLError,
8+
OperationTypeNode,
9+
parse,
10+
Status,
11+
subscribe,
12+
tryCatchSync,
13+
validate,
14+
validateSchema,
15+
} from "./deps.ts";
16+
import { isAsyncGenerator, MessengerImpl } from "./utils.ts";
17+
import {
18+
CloseCode,
19+
GRAPHQL_TRANSPORT_WS_PROTOCOL,
20+
MessageType,
21+
} from "./constants.ts";
22+
import { resolveData } from "./resolve.ts";
23+
import { GraphQLExecutionArgs } from "./types.ts";
24+
25+
/**
26+
* @throws `AggregateError` - When GraphQL schema validation error has occurred.
27+
*/
28+
export default function createHandler(
29+
params: GraphQLExecutionArgs,
30+
): (req: Request) => Response | Promise<Response> {
31+
const validationResult = validateSchema(params.schema);
32+
if (validationResult.length) {
33+
throw new AggregateError(
34+
validationResult,
35+
"GraphQL schema validation error",
36+
);
37+
}
38+
return (req: Request): Response => {
39+
if (isWebsocketRequest(req)) {
40+
const protocol = req.headers.get("sec-websocket-protocol") ?? undefined;
41+
const { response, socket } = Deno.upgradeWebSocket(req, {
42+
protocol,
43+
});
44+
45+
register(socket, params);
46+
47+
return response;
48+
}
49+
50+
return new Response("Not Found", {
51+
status: Status.NotFound,
52+
});
53+
};
54+
}
55+
56+
function isWebsocketRequest(req: Request): boolean {
57+
const upgrade = req.headers.get("upgrade");
58+
59+
return upgrade === "websocket";
60+
}
61+
62+
function register(
63+
socket: WebSocket,
64+
args: GraphQLExecutionArgs,
65+
): void {
66+
socket.addEventListener("open", () => {
67+
if (!isValidProtocol(socket.protocol)) {
68+
return socket.close(
69+
CloseCode.SubprotocolNotAcceptable,
70+
"Sub protocol is not acceptable",
71+
);
72+
}
73+
}, {
74+
once: true,
75+
});
76+
77+
socket.addEventListener("message", async ({ data }) => {
78+
const [message, error] = resolveData(data);
79+
80+
if (!message) {
81+
return socket.close(
82+
CloseCode.BadRequest,
83+
`Invalid message received. ${error.message}`,
84+
);
85+
}
86+
87+
const messenger = new MessengerImpl();
88+
89+
switch (message.type) {
90+
case MessageType.ConnectionInit: {
91+
return safeSend(
92+
socket,
93+
JSON.stringify({
94+
type: MessageType.ConnectionAck,
95+
}),
96+
);
97+
}
98+
99+
case MessageType.Ping: {
100+
return safeSend(
101+
socket,
102+
JSON.stringify({
103+
type: MessageType.Pong,
104+
}),
105+
);
106+
}
107+
108+
case MessageType.Subscribe: {
109+
const { payload } = message;
110+
111+
const [document, error] = tryCatchSync<DocumentNode, GraphQLError>(() =>
112+
parse(payload.query)
113+
);
114+
if (!document) {
115+
const msg = messenger.errorMsg(message.id, [error]);
116+
return safeSend(socket, JSON.stringify(msg));
117+
}
118+
119+
const validationResult = validate(args.schema, document);
120+
if (validationResult.length) {
121+
const msg = messenger.errorMsg(message.id, validationResult);
122+
return safeSend(socket, JSON.stringify(msg));
123+
}
124+
125+
const operationAST = getOperationAST(document);
126+
127+
if (!operationAST) {
128+
const msg = messenger.errorMsg(message.id, [
129+
new GraphQLError("Unable to identify operation"),
130+
]);
131+
132+
return safeSend(socket, JSON.stringify(msg));
133+
}
134+
135+
const executor = getExecutor(operationAST.operation);
136+
137+
const executionArgs: ExecutionArgs = {
138+
...args,
139+
document,
140+
...payload,
141+
};
142+
143+
const executionResult = await executor(executionArgs);
144+
145+
if (isAsyncGenerator(executionResult)) {
146+
for await (const result of executionResult) {
147+
const msg = messenger.nextMsg({ payload: result, id: message.id });
148+
safeSend(socket, JSON.stringify(msg));
149+
}
150+
} else {
151+
const msg = isRequestError(executionResult)
152+
? messenger.errorMsg(message.id, executionResult.errors)
153+
: messenger.nextMsg({
154+
payload: executionResult,
155+
id: message.id,
156+
});
157+
158+
safeSend(socket, JSON.stringify(msg));
159+
}
160+
161+
const msg = messenger.completeMsg(message.id);
162+
safeSend(socket, JSON.stringify(msg));
163+
return;
164+
}
165+
}
166+
});
167+
}
168+
169+
function safeSend(
170+
socket: WebSocket,
171+
data: string | ArrayBufferLike | Blob | ArrayBufferView,
172+
) {
173+
if (socket.readyState === socket.OPEN) {
174+
try {
175+
socket.send(data);
176+
} catch {
177+
// noop
178+
}
179+
}
180+
}
181+
182+
function getExecutor(
183+
operationTypeNode: OperationTypeNode,
184+
): typeof subscribe | typeof execute {
185+
return operationTypeNode === "subscription" ? subscribe : execute;
186+
}
187+
188+
function isValidProtocol(
189+
protocol: string,
190+
): protocol is typeof GRAPHQL_TRANSPORT_WS_PROTOCOL {
191+
return [GRAPHQL_TRANSPORT_WS_PROTOCOL].includes(
192+
protocol,
193+
);
194+
}
195+
196+
type RequestErrorResult = {
197+
errors: [];
198+
};
199+
200+
function isRequestError(
201+
executionResult: ExecutionResult,
202+
): executionResult is RequestErrorResult {
203+
return !("data" in executionResult);
204+
}

0 commit comments

Comments
 (0)