Skip to content

Commit

Permalink
Allow underlyingSource to be a stream
Browse files Browse the repository at this point in the history
  • Loading branch information
kasperisager committed Dec 12, 2024
1 parent 589d69f commit 14ad1ba
Showing 1 changed file with 21 additions and 17 deletions.
38 changes: 21 additions & 17 deletions web.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { Readable, getStreamError, isDisturbed } = require('streamx')
const { Readable, getStreamError, isStream, isDisturbed } = require('streamx')

// https://streams.spec.whatwg.org/#readablestreamdefaultreader
exports.ReadableStreamDefaultReader = class ReadableStreamDefaultReader {
Expand Down Expand Up @@ -97,24 +97,28 @@ exports.ReadableStreamDefaultController = class ReadableStreamDefaultController

// https://streams.spec.whatwg.org/#readablestream
exports.ReadableStream = class ReadableStream {
constructor(
underlyingSource = {},
queuingStrategy = new exports.CountQueuingStrategy(),
stream
) {
const { start, pull } = underlyingSource
const { highWaterMark = 1, size = defaultSize } = queuingStrategy
constructor(underlyingSource = {}, queuingStrategy) {
if (isStream(underlyingSource)) {
this._stream = underlyingSource
} else {
if (queuingStrategy === undefined) {
queuingStrategy = new exports.CountQueuingStrategy()
}

this._stream = stream || new Readable({ highWaterMark, byteLength: size })
const { start, pull } = underlyingSource
const { highWaterMark = 1, size = defaultSize } = queuingStrategy

this._controller = new exports.ReadableStreamDefaultController(this)
this._stream = new Readable({ highWaterMark, byteLength: size })

if (start) {
this._stream._open = open.bind(this, start.call(this, this._controller))
}
const controller = new exports.ReadableStreamDefaultController(this)

if (pull) {
this._stream._read = read.bind(this, pull)
if (start) {
this._stream._open = open.bind(this, start.call(this, controller))
}

if (pull) {
this._stream._read = read.bind(this, pull.bind(this, controller))
}
}
}

Expand Down Expand Up @@ -143,7 +147,7 @@ exports.ReadableStream = class ReadableStream {
}

static from(iterable) {
return new ReadableStream(undefined, undefined, Readable.from(iterable))
return new ReadableStream(Readable.from(iterable))
}
}

Expand All @@ -159,7 +163,7 @@ async function open(starting, cb) {

async function read(pull, cb) {
try {
await pull(this._controller)
await pull()

cb(null)
} catch (err) {
Expand Down

0 comments on commit 14ad1ba

Please sign in to comment.