Skip to content

Commit 1f9ee16

Browse files
committed
Change ajax_stream to use new-line delimited JSON (#52797)
1 parent 5ad2732 commit 1f9ee16

File tree

3 files changed

+34
-48
lines changed

3 files changed

+34
-48
lines changed

src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ describe('ajaxStream', () => {
2626
it('pulls items from the stream and calls the handler', async () => {
2727
const handler = jest.fn(() => ({}));
2828
const { req, sendText, done } = mockRequest();
29-
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
29+
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];
3030

3131
const promise = ajaxStream('', {}, req, {
3232
url: '/test/endpoint',
@@ -43,12 +43,34 @@ describe('ajaxStream', () => {
4343
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
4444
});
4545

46+
it('handles newlines in values', async () => {
47+
const handler = jest.fn(() => ({}));
48+
const { req, sendText, done } = mockRequest();
49+
const messages = [
50+
JSON.stringify({ hello: 'wo\nrld' }),
51+
'\n',
52+
JSON.stringify({ tis: 'fa\nte' }),
53+
'\n',
54+
];
55+
56+
const promise = ajaxStream('', {}, req, {
57+
url: '/test/endpoint',
58+
onResponse: handler,
59+
});
60+
61+
messages.forEach(sendText);
62+
done();
63+
64+
await promise;
65+
expect(handler).toHaveBeenCalledTimes(2);
66+
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
67+
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
68+
});
69+
4670
it('handles partial messages', async () => {
4771
const handler = jest.fn(() => ({}));
4872
const { req, sendText, done } = mockRequest();
49-
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
50-
.map(m => `${m.length}:${m}`)
51-
.join('');
73+
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
5274

5375
const promise = ajaxStream('', {}, req, {
5476
url: '/test/endpoint',
@@ -117,7 +139,7 @@ describe('ajaxStream', () => {
117139
it('rejects if the payload contains invalid JSON', async () => {
118140
const handler = jest.fn(() => ({}));
119141
const { req, sendText, done } = mockRequest();
120-
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
142+
const messages = ['{ waut? }\n'].join('');
121143

122144
const promise = ajaxStream('', {}, req, {
123145
url: '/test/endpoint',
@@ -130,32 +152,12 @@ describe('ajaxStream', () => {
130152
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
131153
});
132154

133-
it('rejects if the delim is invalid', async () => {
134-
const handler = jest.fn(() => ({}));
135-
const { req, sendText, done } = mockRequest();
136-
const messages = '{ "hi": "there" }';
137-
138-
const promise = ajaxStream('', {}, req, {
139-
url: '/test/endpoint',
140-
onResponse: handler,
141-
});
142-
143-
sendText(messages);
144-
done();
145-
146-
expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
147-
/invalid stream response/i
148-
);
149-
});
150-
151155
it('rejects if the handler throws', async () => {
152156
const handler = jest.fn(() => {
153157
throw new Error('DOH!');
154158
});
155159
const { req, sendText, done } = mockRequest();
156-
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
157-
.map(m => `${m.length}:${m}`)
158-
.join('');
160+
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');
159161

160162
const promise = ajaxStream('', {}, req, {
161163
url: '/test/endpoint',

src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,34 +64,19 @@ function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
6464
return (text: string) => {
6565
// While there's text to process...
6666
while (index < text.length) {
67-
// Our messages are delimited by colon: len:json
68-
const delim = ':';
67+
// We're using new line-delimited JSON.
68+
const delim = '\n';
6969
const delimIndex = text.indexOf(delim, index);
70-
const payloadStart = delimIndex + delim.length;
7170

7271
// We've got an incomplete batch length
7372
if (delimIndex < 0) {
7473
return;
7574
}
7675

77-
const rawLen = text.slice(index, delimIndex);
78-
const payloadLen = parseInt(rawLen, 10);
79-
const payloadEnd = payloadStart + payloadLen;
80-
81-
// We've got an invalid batch message (e.g. one without a numeric length: prefix)
82-
if (isNaN(payloadLen)) {
83-
throw new Error(`Invalid stream response length: ${rawLen}`);
84-
}
85-
86-
// We've got an incomplete batch message
87-
if (text.length < payloadEnd) {
88-
return;
89-
}
90-
91-
const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
76+
const payload = JSON.parse(text.slice(index, delimIndex));
9277
handler(payload);
9378

94-
index = payloadEnd;
79+
index = delimIndex + 1;
9580
}
9681
};
9782
}

src/legacy/core_plugins/interpreter/server/routes/server_functions.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,15 @@ function runServerFunctions(server: any) {
7575

7676
// Send the initial headers.
7777
res.writeHead(200, {
78-
'Content-Type': 'text/plain',
78+
'Content-Type': 'application/x-ndjson',
7979
Connection: 'keep-alive',
8080
'Transfer-Encoding': 'chunked',
8181
'Cache-Control': 'no-cache',
8282
});
8383

8484
// Write a length-delimited response
8585
const streamResult = (result: any) => {
86-
const payload = JSON.stringify(result) + '\n';
87-
res.write(`${payload.length}:${payload}`);
86+
res.write(JSON.stringify(result) + '\n');
8887
};
8988

9089
// Tries to run an interpreter function, and ensures a consistent error payload on failure.

0 commit comments

Comments
 (0)