Skip to content

Commit

Permalink
fix: delta stream parser (#1280)
Browse files Browse the repository at this point in the history
* fix: delta stream  parser

* comment
  • Loading branch information
jczhong84 authored Jun 23, 2023
1 parent 803ae8b commit b4da637
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 28 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "querybook",
"version": "3.25.0",
"version": "3.25.1",
"description": "A Big Data Webapp",
"private": true,
"scripts": {
Expand Down
64 changes: 48 additions & 16 deletions querybook/webapp/__tests__/lib/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,56 @@ import { DeltaStreamParser } from 'lib/stream';
describe('DeltaStreamParser', () => {
it('Works for stream without key/value pairs', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('some data')).toEqual({
parser.parse('some data');
expect(parser.result).toEqual({
data: 'some data',
});
expect(parser.parse('\nsome more data')).toEqual({
parser.parse('\nsome more data');
expect(parser.result).toEqual({
data: 'some data\nsome more data',
});
});

it('Works for stream ending with non empty buffer', () => {
const parser = new DeltaStreamParser();
parser.parse('201');
parser.parse('9');
expect(parser.result).toEqual({
data: '201',
});
parser.close();
expect(parser.result).toEqual({
data: '2019',
});
});

it('Works for stream with both data and key/value pairs', () => {
const parser = new DeltaStreamParser();
parser.parse('some data');
expect(parser.parse('\n<@some_key@>\nsome value')).toEqual({
parser.parse('\n<@some_key@>\nsome value');
expect(parser.result).toEqual({
data: 'some data\n',
some_key: 'some value',
});
});

it('Works for stream with only key/value pairs', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('<@some_key@>')).toEqual({

parser.parse('<@some_key@>');
expect(parser.result).toEqual({
data: '',
some_key: '',
});
expect(parser.parse('\nsome value\n')).toEqual({

parser.parse('\nsome value\n');
expect(parser.result).toEqual({
data: '',
some_key: 'some value\n',
});
expect(parser.parse('<@another_key@>\nanother value')).toEqual({

parser.parse('<@another_key@>\nanother value');
expect(parser.result).toEqual({
data: '',
some_key: 'some value\n',
another_key: 'another value',
Expand All @@ -39,44 +61,54 @@ describe('DeltaStreamParser', () => {

it('Works for partial stream', () => {
const parser = new DeltaStreamParser();
expect(parser.parse('some da')).toEqual({
parser.parse('some da');
expect(parser.result).toEqual({
data: 'some da',
});
// wait for <@ to be complete before parsing
expect(parser.parse('ta<')).toEqual({
parser.parse('ta<');
expect(parser.result).toEqual({
data: 'some da',
});
// the next char is not @, so it will be treated as data
expect(parser.parse('ta')).toEqual({
parser.parse('ta');
expect(parser.result).toEqual({
data: 'some data<ta',
});

// wait for <@ to be complete before parsing
expect(parser.parse('<')).toEqual({
parser.parse('<');
expect(parser.result).toEqual({
data: 'some data<ta',
});
// the next char is @, so the following will be treated as key
expect(parser.parse('@som')).toEqual({
parser.parse('@som');
expect(parser.result).toEqual({
data: 'some data<ta',
});
// wait for @> to be complete before parsing
expect(parser.parse('e_key@')).toEqual({
parser.parse('e_key@');
expect(parser.result).toEqual({
data: 'some data<ta',
});
expect(parser.parse('>\n')).toEqual({
parser.parse('>\n');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: '',
});
expect(parser.parse('som')).toEqual({
parser.parse('som');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'som',
});
expect(parser.parse('e value')).toEqual({
parser.parse('e value');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'some value',
});
// ignore the last incomplete key
expect(parser.parse('<@ano')).toEqual({
parser.parse('<@ano');
expect(parser.result).toEqual({
data: 'some data<ta',
some_key: 'some value',
});
Expand Down
4 changes: 2 additions & 2 deletions querybook/webapp/components/QueryCellTitle/QueryCellTitle.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export const QueryCellTitle: React.FC<IQueryCellTitleProps> = ({
onChange(title);
}, [title]);

const handleOnClick = useCallback(() => {
const handleTitleGenerationClick = useCallback(() => {
startStream();
trackClick({
component: ComponentType.AI_ASSISTANT,
Expand All @@ -61,7 +61,7 @@ export const QueryCellTitle: React.FC<IQueryCellTitleProps> = ({
size={18}
tooltip="AI: generate title"
color={!value && query ? 'accent' : undefined}
onClick={handleOnClick}
onClick={handleTitleGenerationClick}
/>
)}
<ResizableTextArea
Expand Down
3 changes: 2 additions & 1 deletion querybook/webapp/hooks/useStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ export function useStream(
setSteamStatus(StreamStatus.STREAMING);
setData({});

ds.stream(url, params, setData, () => {
ds.stream(url, params, setData, (data) => {
setData(data);
setSteamStatus(StreamStatus.FINISHED);
});
}, [url, params]);
Expand Down
9 changes: 5 additions & 4 deletions querybook/webapp/lib/datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,16 @@ function streamDatasource(
url: string,
params?: Record<string, unknown>,
onStreaming?: (data: { [key: string]: string }) => void,
onStreamingEnd?: () => void
onStreamingEnd?: (data?: { [key: string]: string }) => void
) {
const eventSource = new EventSource(
`${url}?params=${JSON.stringify(params)}`
);
const parser = new DeltaStreamParser();
eventSource.addEventListener('message', (e) => {
const newToken = JSON.parse(e.data).data;
const data = parser.parse(newToken);
onStreaming?.(data);
parser.parse(newToken);
onStreaming?.(parser.result);
});
eventSource.addEventListener('error', (e) => {
console.error(e);
Expand All @@ -198,7 +198,8 @@ function streamDatasource(
});
eventSource.addEventListener('close', (e) => {
eventSource.close();
onStreamingEnd?.();
parser.close();
onStreamingEnd?.(parser.result);
});
}

Expand Down
17 changes: 13 additions & 4 deletions querybook/webapp/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@ export class DeltaStreamParser {
this._currentValue = '';
this._isPartialKey = false;
}

public parse(delta: string): { [key: string]: string } {
this._parse(delta);
public get result() {
// make a copy of the result to avoid modifying the original result by the caller
return { ...this._result };
}

private _parse(delta: string): { [key: string]: string } {
public parse(delta: string) {
this._buffer += delta;
// This is to make sure we always have complete <@ and @> in the buffer
if (
Expand Down Expand Up @@ -96,4 +94,15 @@ export class DeltaStreamParser {

this._buffer = '';
}

public close() {
// flush the buffer if the stream has ended
if (this._buffer.length) {
if (!this._isPartialKey) {
this._currentValue += this._buffer;
this._result[this._currentKey] = this._currentValue.trimStart();
this._buffer = '';
}
}
}
}

0 comments on commit b4da637

Please sign in to comment.