Skip to content

Commit

Permalink
Add Stream
Browse files Browse the repository at this point in the history
  • Loading branch information
lxsmnsyc committed Dec 5, 2023
1 parent f37faa5 commit a403955
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 28 deletions.
28 changes: 28 additions & 0 deletions packages/seroval/plugins/web/readable-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { createPlugin } from 'seroval';

export default createPlugin({
tag: 'seroval/plugins/web/readable-stream',
test(value) {
if (typeof ReadableStream === 'undefined') {
return false;
}
return value instanceof ReadableStream;
},
parse: {
sync(value, ctx, data) {

},
async(value, ctx, data) {

},
stream(value, ctx, data) {
return ctx.parse()
},
},
serialize(node, ctx, data) {

},
deserialize(node, ctx, data) {

},
});
25 changes: 25 additions & 0 deletions packages/seroval/src/core/base-primitives.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import type {
SerovalReferenceNode,
SerovalRegExpNode,
SerovalSetNode,
SerovalStreamConstructorNode,
SerovalStringNode,
SerovalTypedArrayNode,
SerovalWKSymbolNode,
Expand Down Expand Up @@ -457,3 +458,27 @@ export function createAsyncIteratorFactoryInstanceNode(
o: undefined,
};
}

export function createStreamConstructorNode(
id: number,
factory: SerovalNodeWithID,
init: SerovalNode,
): SerovalStreamConstructorNode {
return {
t: SerovalNodeType.StreamConstructor,
i: id,
s: undefined,
l: undefined,
c: undefined,
m: undefined,
p: undefined,
e: undefined,
a: [
factory,
init,
],
f: undefined,
b: undefined,
o: undefined,
};
}
4 changes: 4 additions & 0 deletions packages/seroval/src/core/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ export const enum SerovalNodeType {
AsyncIteratorFactory = 44,
AsyncIteratorFactoryInstance = 45,
ReadableStream = 46,
StreamConstructor = 47,
StreamNext = 48,
StreamThrow = 49,
StreamReturn = 50,
}

export const enum SerovalObjectFlags {
Expand Down
58 changes: 58 additions & 0 deletions packages/seroval/src/core/context/deserializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import type {
SerovalRequestNode,
SerovalResponseNode,
SerovalSetNode,
SerovalStreamConstructorNode,
SerovalStreamNextNode,
SerovalStreamReturnNode,
SerovalStreamThrowNode,
SerovalTypedArrayNode,
SerovalURLNode,
SerovalURLSearchParamsNode,
Expand All @@ -64,6 +68,8 @@ import { getTypedArrayConstructor } from '../utils/typed-array';
import type { Deferred, DeferredStream } from '../utils/deferred';
import { createDeferred, createDeferredStream } from '../utils/deferred';
import assert from '../utils/assert';
import type { Stream, StreamInit } from '../stream';
import { createStream } from '../stream';

function applyObjectFlag(obj: unknown, flag: SerovalObjectFlags): unknown {
switch (flag) {
Expand Down Expand Up @@ -527,6 +533,50 @@ export default abstract class BaseDeserializerContext implements PluginAccessOpt
);
}

private deserializeStreamConstructor(
node: SerovalStreamConstructorNode,
): unknown {
return this.assignIndexedValue(
node.i,
createStream(
this.deserialize(node.a[1]) as StreamInit,
),
);
}

private deserializeStreamNext(
node: SerovalStreamNextNode,
): unknown {
const deferred = this.refs.get(node.i) as Stream | undefined;
assert(deferred, new Error('Missing ReadableStream instance.'));
deferred.next(
this.deserialize(node.f),
);
return undefined;
}

private deserializeStreamThrow(
node: SerovalStreamThrowNode,
): unknown {
const deferred = this.refs.get(node.i) as Stream | undefined;
assert(deferred, new Error('Missing ReadableStream instance.'));
deferred.throw(
this.deserialize(node.f),
);
return undefined;
}

private deserializeStreamReturn(
node: SerovalStreamReturnNode,
): unknown {
const deferred = this.refs.get(node.i) as Stream | undefined;
assert(deferred, new Error('Missing ReadableStream instance.'));
deferred.return(
this.deserialize(node.f),
);
return undefined;
}

deserialize(node: SerovalNode): unknown {
switch (node.t) {
case SerovalNodeType.Constant:
Expand Down Expand Up @@ -615,6 +665,14 @@ export default abstract class BaseDeserializerContext implements PluginAccessOpt
return this.deserializeAsyncIteratorFactoryInstance(node);
case SerovalNodeType.ReadableStream:
return this.deserializeReadableStream(node);
case SerovalNodeType.StreamConstructor:
return this.deserializeStreamConstructor(node);
case SerovalNodeType.StreamNext:
return this.deserializeStreamNext(node);
case SerovalNodeType.StreamThrow:
return this.deserializeStreamThrow(node);
case SerovalNodeType.StreamReturn:
return this.deserializeStreamReturn(node);
case SerovalNodeType.SpecialReference:
case SerovalNodeType.IteratorFactory:
case SerovalNodeType.AsyncIteratorFactory:
Expand Down
20 changes: 20 additions & 0 deletions packages/seroval/src/core/context/parser/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
createAggregateErrorNode,
createIteratorFactoryInstanceNode,
createAsyncIteratorFactoryInstanceNode,
createStreamConstructorNode,
} from '../../base-primitives';
import { BIGINT_FLAG, Feature } from '../../compat';
import {
Expand Down Expand Up @@ -70,6 +71,7 @@ import type {
SerovalSetNode,
SerovalDataViewNode,
SerovalReadableStreamNode,
SerovalStreamConstructorNode,
} from '../../types';
import {
createURLNode,
Expand All @@ -80,6 +82,8 @@ import {
createReadableStreamNode,
} from '../../web-api';
import { SpecialReference, UNIVERSAL_SENTINEL } from '../../special-reference';
import type { Stream } from '../../stream';
import { isStream, toStreamInit } from '../../stream';

type ObjectLikeNode =
| SerovalObjectNode
Expand Down Expand Up @@ -496,13 +500,29 @@ export default abstract class BaseAsyncParserContext extends BaseParserContext {
);
}

private async parseStream(
id: number,
current: Stream,
): Promise<SerovalStreamConstructorNode> {
return createStreamConstructorNode(
id,
this.parseSpecialReference(SpecialReference.StreamConstructor),
await this.parse(
toStreamInit(current),
),
);
}

private async parseObject(
id: number,
current: object,
): Promise<SerovalNode> {
if (Array.isArray(current)) {
return this.parseArray(id, current);
}
if (isStream(current)) {
return this.parseStream(id, current);
}
const currentClass = current.constructor;
switch (currentClass) {
case Object:
Expand Down
Loading

0 comments on commit a403955

Please sign in to comment.