Skip to content

Commit 7784776

Browse files
SSE transport (#5)
1 parent 439f15a commit 7784776

File tree

3 files changed

+90
-6
lines changed

3 files changed

+90
-6
lines changed

src/tools/get-api-usage-stats.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class GetApiUsageTool extends LinkedApiTool<TApiUsageParams, TApiUsageAct
2626
public override getTool(): Tool {
2727
return {
2828
name: this.name,
29-
description: 'Retrieve Linked API usage statistics',
29+
description: 'Retrieve Linked API usage statistics. Date range must not exceed 30 days.',
3030
inputSchema: {
3131
type: 'object',
3232
properties: {
@@ -38,7 +38,7 @@ export class GetApiUsageTool extends LinkedApiTool<TApiUsageParams, TApiUsageAct
3838
end: {
3939
type: 'string',
4040
description:
41-
"End date for the statistics period in ISO 8601 format (e.g., '2024-01-31T23:59:59Z')",
41+
"End date for the statistics period in ISO 8601 format (e.g., '2024-01-30T00:00:00Z')",
4242
},
4343
},
4444
required: ['start', 'end'],

src/tools/retrieve-connections.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export class RetrieveConnectionsTool extends OperationTool<
1515
public override readonly name = 'retrieve_connections';
1616
public override readonly operationName = OPERATION_NAME.retrieveConnections;
1717
protected override readonly schema = z.object({
18-
limit: z.number().min(1).max(100).optional(),
18+
limit: z.number().min(1).max(1000).optional(),
1919
filter: z
2020
.object({
2121
firstName: z.string().optional(),

src/utils/json-http-transport.ts

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ type ConnectionContext = {
1717
responses: Map<RequestId, JSONRPCMessage>;
1818
};
1919

20+
type SseContext = {
21+
res: ServerResponse;
22+
keepalive: NodeJS.Timeout;
23+
};
24+
2025
export class JsonHTTPServerTransport implements Transport {
2126
public onclose?: () => void;
2227
public onerror?: (error: Error) => void;
@@ -28,13 +33,25 @@ export class JsonHTTPServerTransport implements Transport {
2833
private started = false;
2934
private requestIdToConn = new Map<RequestId, string>();
3035
private connections = new Map<string, ConnectionContext>();
36+
private sse?: SseContext;
3137

3238
async start(): Promise<void> {
3339
if (this.started) throw new Error('Transport already started');
3440
this.started = true;
3541
}
3642

3743
async close(): Promise<void> {
44+
if (this.sse) {
45+
try {
46+
clearInterval(this.sse.keepalive);
47+
if (!this.sse.res.writableEnded) {
48+
this.sse.res.end();
49+
}
50+
} catch {
51+
// ignore
52+
}
53+
this.sse = undefined;
54+
}
3855
this.connections.forEach((ctx) => {
3956
try {
4057
if (!ctx.res.writableEnded) {
@@ -50,6 +67,13 @@ export class JsonHTTPServerTransport implements Transport {
5067
}
5168

5269
async send(message: JSONRPCMessage, options?: TransportSendOptions): Promise<void> {
70+
// If SSE is connected, stream all server -> client messages via SSE
71+
if (this.sse && !this.sse.res.writableEnded) {
72+
const line = `data: ${JSON.stringify(message)}\n\n`;
73+
this.sse.res.write(line);
74+
return;
75+
}
76+
5377
let relatedId = options?.relatedRequestId;
5478
if (isJSONRPCResponse(message) || isJSONRPCError(message)) {
5579
relatedId = message.id;
@@ -82,13 +106,55 @@ export class JsonHTTPServerTransport implements Transport {
82106
ctx.orderedIds.forEach((id) => this.requestIdToConn.delete(id));
83107
}
84108

85-
// Handle only POST requests; no SSE/GET support
109+
// Handle HTTP requests: supports POST for JSON and GET for SSE
86110
async handleRequest(
87111
req: IncomingMessage & { auth?: unknown },
88112
res: ServerResponse,
89113
parsedBody?: unknown,
90114
): Promise<void> {
91115
try {
116+
// SSE endpoint: accept GET with text/event-stream
117+
const acceptHeader = (req.headers['accept'] || '').toString();
118+
if (req.method === 'GET' && acceptHeader.includes('text/event-stream')) {
119+
// Close previous SSE if any
120+
if (this.sse) {
121+
try {
122+
clearInterval(this.sse.keepalive);
123+
if (!this.sse.res.writableEnded) this.sse.res.end();
124+
} catch {
125+
// ignore
126+
}
127+
}
128+
129+
res.writeHead(200, {
130+
'Content-Type': 'text/event-stream',
131+
'Cache-Control': 'no-cache, no-transform',
132+
Connection: 'keep-alive',
133+
'X-Accel-Buffering': 'no',
134+
});
135+
// Send an initial comment to establish the stream
136+
res.write(': connected\n\n');
137+
138+
const keepalive = setInterval(() => {
139+
if (res.writableEnded) return;
140+
res.write('event: ping\ndata: {}\n\n');
141+
}, 25000);
142+
143+
this.sse = {
144+
res,
145+
keepalive,
146+
};
147+
148+
res.on('close', () => {
149+
try {
150+
clearInterval(keepalive);
151+
} finally {
152+
this.sse = undefined;
153+
}
154+
});
155+
return;
156+
}
157+
92158
if (req.method !== 'POST') {
93159
res.writeHead(405, { Allow: 'POST' }).end(
94160
JSON.stringify({
@@ -103,15 +169,18 @@ export class JsonHTTPServerTransport implements Transport {
103169
return;
104170
}
105171

172+
// For POST, allow generic Accepts; when SSE is connected, we don't require JSON accept
106173
const accept = req.headers['accept'];
107-
if (!(accept && accept.includes('application/json'))) {
174+
const acceptsJson = !!(accept && accept.includes('application/json'));
175+
const sseActive = !!this.sse && !this.sse.res.writableEnded;
176+
if (!acceptsJson && !sseActive) {
108177
res.writeHead(406);
109178
res.end(
110179
JSON.stringify({
111180
jsonrpc: '2.0',
112181
error: {
113182
code: -32000,
114-
message: 'Not Acceptable: Client must accept application/json',
183+
message: 'Not Acceptable: Client must accept application/json or have SSE open',
115184
},
116185
id: null,
117186
}),
@@ -165,6 +234,21 @@ export class JsonHTTPServerTransport implements Transport {
165234
}
166235

167236
const orderedIds: RequestId[] = messages.filter(isJSONRPCRequest).map((m) => m.id);
237+
const sseConnected = !!this.sse && !this.sse.res.writableEnded;
238+
if (sseConnected) {
239+
// With SSE, we emit responses on the SSE stream; reply 202 to POST immediately
240+
res.writeHead(202).end();
241+
for (const msg of messages) {
242+
this.onmessage?.(msg, {
243+
requestInfo: {
244+
headers: req.headers,
245+
},
246+
authInfo: req.auth,
247+
});
248+
}
249+
return;
250+
}
251+
168252
const connId = `${Date.now()}-${Math.random()}`;
169253
this.connections.set(connId, {
170254
res,

0 commit comments

Comments
 (0)