Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('ajaxStream', () => {
it('pulls items from the stream and calls the handler', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].map(m => `${m.length}:${m}`);
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -43,12 +43,34 @@ describe('ajaxStream', () => {
expect(handler).toHaveBeenCalledWith({ tis: 'fate' });
});

it('handles newlines in values', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = [
JSON.stringify({ hello: 'wo\nrld' }),
'\n',
JSON.stringify({ tis: 'fa\nte' }),
'\n',
];

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

messages.forEach(sendText);
done();

await promise;
expect(handler).toHaveBeenCalledTimes(2);
expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' });
expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' });
});

it('handles partial messages', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down Expand Up @@ -117,7 +139,7 @@ describe('ajaxStream', () => {
it('rejects if the payload contains invalid JSON', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = ['{ waut? }\n'].map(m => `${m.length}:${m}`).join('');
const messages = ['{ waut? }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand All @@ -130,32 +152,12 @@ describe('ajaxStream', () => {
expect(await promise.then(() => true).catch(() => false)).toBeFalsy();
});

it('rejects if the delim is invalid', async () => {
const handler = jest.fn(() => ({}));
const { req, sendText, done } = mockRequest();
const messages = '{ "hi": "there" }';

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
onResponse: handler,
});

sendText(messages);
done();

expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(
/invalid stream response/i
);
});

it('rejects if the handler throws', async () => {
const handler = jest.fn(() => {
throw new Error('DOH!');
});
const { req, sendText, done } = mockRequest();
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']
.map(m => `${m.length}:${m}`)
.join('');
const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join('');

const promise = ajaxStream('', {}, req, {
url: '/test/endpoint',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,19 @@ function processBatchResponseStream<T>(handler: BatchResponseHandler<T>) {
return (text: string) => {
// While there's text to process...
while (index < text.length) {
// Our messages are delimited by colon: len:json
const delim = ':';
// We're using new line-delimited JSON.
const delim = '\n';
const delimIndex = text.indexOf(delim, index);
const payloadStart = delimIndex + delim.length;

// We've got an incomplete batch length
if (delimIndex < 0) {
return;
}

const rawLen = text.slice(index, delimIndex);
const payloadLen = parseInt(rawLen, 10);
const payloadEnd = payloadStart + payloadLen;

// We've got an invalid batch message (e.g. one without a numeric length: prefix)
if (isNaN(payloadLen)) {
throw new Error(`Invalid stream response length: ${rawLen}`);
}

// We've got an incomplete batch message
if (text.length < payloadEnd) {
return;
}

const payload = JSON.parse(text.slice(payloadStart, payloadEnd));
const payload = JSON.parse(text.slice(index, delimIndex));
handler(payload);

index = payloadEnd;
index = delimIndex + 1;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,15 @@ function runServerFunctions(server: any) {

// Send the initial headers.
res.writeHead(200, {
'Content-Type': 'text/plain',
'Content-Type': 'application/x-ndjson',
Connection: 'keep-alive',
'Transfer-Encoding': 'chunked',
'Cache-Control': 'no-cache',
});

// Write a length-delimited response
const streamResult = (result: any) => {
const payload = JSON.stringify(result) + '\n';
res.write(`${payload.length}:${payload}`);
res.write(JSON.stringify(result) + '\n');
};

// Tries to run an interpreter function, and ensures a consistent error payload on failure.
Expand Down