Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delta stream parser #1280

Merged
merged 2 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.25.0",
"version": "3.25.1",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
64 changes: 48 additions & 16 deletions querybook/webapp/__tests__/lib/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,56 @@ import { DeltaStreamParser } from 'lib/stream';
describe('DeltaStreamParser', () => {
it('Works for stream without key/value pairs', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('some data')).toEqual({
parser.parse('some data');
expect(parser.result).toEqual({
data: 'some data',
});
expect(parser.parse('\nsome more data')).toEqual({
parser.parse('\nsome more data');
expect(parser.result).toEqual({
data: 'some data\nsome more data',
});
});

it('Works for stream ending with non empty buffer', () => {
const parser = new DeltaStreamParser();
parser.parse('201');
parser.parse('9');
expect(parser.result).toEqual({
data: '201',
});
parser.close();
expect(parser.result).toEqual({
data: '2019',
});
});

it('Works for stream with both data and key/value pairs', () => {
const parser = new DeltaStreamParser();
parser.parse('some data');
expect(parser.parse('\n<@some_key@>\nsome value')).toEqual({
parser.parse('\n<@some_key@>\nsome value');
expect(parser.result).toEqual({
data: 'some data\n',
some_key: 'some value',
});
});

it('Works for stream with only key/value pairs', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('<@some_key@>')).toEqual({

parser.parse('<@some_key@>');
expect(parser.result).toEqual({
data: '',
some_key: '',
});
expect(parser.parse('\nsome value\n')).toEqual({

parser.parse('\nsome value\n');
expect(parser.result).toEqual({
data: '',
some_key: 'some value\n',
});
expect(parser.parse('<@another_key@>\nanother value')).toEqual({

parser.parse('<@another_key@>\nanother value');
expect(parser.result).toEqual({
data: '',
some_key: 'some value\n',
another_key: 'another value',
Expand All @@ -39,44 +61,54 @@ describe('DeltaStreamParser', () => {

it('Works for partial stream', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('some da')).toEqual({
parser.parse('some da');
expect(parser.result).toEqual({
data: 'some da',
});
// wait for <@ to be complete before parsing
expect(parser.parse('ta<')).toEqual({
parser.parse('ta<');
expect(parser.result).toEqual({
data: 'some da',
});
// the next char is not @, so it will be treated as data
expect(parser.parse('ta')).toEqual({
parser.parse('ta');
expect(parser.result).toEqual({
data: 'some data<ta',
});

// wait for <@ to be complete before parsing
expect(parser.parse('<')).toEqual({
parser.parse('<');
expect(parser.result).toEqual({
data: 'some data<ta',
});
// the next char is @, so the following will be treated as key
expect(parser.parse('@som')).toEqual({
parser.parse('@som');
expect(parser.result).toEqual({
data: 'some data<ta',
});
// wait for @> to be complete before parsing
expect(parser.parse('e_key@')).toEqual({
parser.parse('e_key@');
expect(parser.result).toEqual({
data: 'some data<ta',
});
expect(parser.parse('>\n')).toEqual({
parser.parse('>\n');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: '',
});
expect(parser.parse('som')).toEqual({
parser.parse('som');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'som',
});
expect(parser.parse('e value')).toEqual({
parser.parse('e value');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'some value',
});
// ignore the last incomplete key
expect(parser.parse('<@ano')).toEqual({
parser.parse('<@ano');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'some value',
});
Expand Down
4 changes: 2 additions & 2 deletions querybook/webapp/components/QueryCellTitle/QueryCellTitle.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const QueryCellTitle: React.FC<IQueryCellTitleProps> = ({
onChange(title);
}, [title]);

const handleOnClick = useCallback(() => {
const handleTitleGenerationClick = useCallback(() => {
startStream();
trackClick({
component: ComponentType.AI_ASSISTANT,
Expand All @@ -61,7 +61,7 @@ export const QueryCellTitle: React.FC<IQueryCellTitleProps> = ({
size={18}
tooltip="AI: generate title"
color={!value && query ? 'accent' : undefined}
onClick={handleOnClick}
onClick={handleTitleGenerationClick}
/>
)}
<ResizableTextArea
Expand Down
3 changes: 2 additions & 1 deletion querybook/webapp/hooks/useStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export function useStream(
setSteamStatus(StreamStatus.STREAMING);
setData({});

ds.stream(url, params, setData, () => {
ds.stream(url, params, setData, (data) => {
setData(data);
setSteamStatus(StreamStatus.FINISHED);
});
}, [url, params]);
Expand Down
9 changes: 5 additions & 4 deletions querybook/webapp/lib/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ function streamDatasource(
url: string,
params?: Record<string, unknown>,
onStreaming?: (data: { [key: string]: string }) => void,
onStreamingEnd?: () => void
onStreamingEnd?: (data?: { [key: string]: string }) => void
) {
const eventSource = new EventSource(
`${url}?params=${JSON.stringify(params)}`
);
const parser = new DeltaStreamParser();
eventSource.addEventListener('message', (e) => {
const newToken = JSON.parse(e.data).data;
const data = parser.parse(newToken);
onStreaming?.(data);
parser.parse(newToken);
onStreaming?.(parser.result);
});
eventSource.addEventListener('error', (e) => {
console.error(e);
Expand All @@ -198,7 +198,8 @@ function streamDatasource(
});
eventSource.addEventListener('close', (e) => {
eventSource.close();
onStreamingEnd?.();
parser.close();
onStreamingEnd?.(parser.result);
});
}

Expand Down
17 changes: 13 additions & 4 deletions querybook/webapp/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ export class DeltaStreamParser {
this._currentValue = '';
this._isPartialKey = false;
}

public parse(delta: string): { [key: string]: string } {
this._parse(delta);
public get result() {
// make a copy of the result to avoid modifying the original result by the caller
return { ...this._result };
}

private _parse(delta: string): { [key: string]: string } {
public parse(delta: string) {
this._buffer += delta;
// This is to make sure we always have complete <@ and @> in the buffer
if (
Expand Down Expand Up @@ -96,4 +94,15 @@ export class DeltaStreamParser {

this._buffer = '';
}

public close() {
// flush the buffer if the stream has ended
if (this._buffer.length) {
if (!this._isPartialKey) {
this._currentValue += this._buffer;
this._result[this._currentKey] = this._currentValue.trimStart();
this._buffer = '';
}
}
}
}