Skip to content

Commit

Permalink
fix(Server): correct writable stream implement
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jul 4, 2022
1 parent e7c395a commit 225bd13
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 33 deletions.
8 changes: 4 additions & 4 deletions packages/server/js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ class Connection {
this._body = null
this._reject_data = null
this._on_aborted = []
this._on_writable = null
this.aborted = false
this.upgraded = false
this.response.onAborted(() => {
this._on_aborted.forEach(call => call())
this.aborted = true
})
this.response.onWritable((offset) =>
this._on_writable ? this._on_writable(offset) : true)
this.wsContext = wsContext
this._remote_address = null
}
Expand Down Expand Up @@ -136,10 +139,7 @@ class Connection {
}

onWritable (callback) {
if (this.aborted) {
throw new ServerError({ code: 'CONNECTION_ABORTED' })
}
return this.response.onWritable(callback)
this._on_writable = callback
}

onAborted (callback) {
Expand Down
62 changes: 33 additions & 29 deletions packages/server/js/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ const { Writable } = require('stream')

const staticPath = path.resolve(process.cwd(), 'static')

const noop = () => {}

function toArrayBuffer (buffer) {
if (typeof buffer === 'object' && buffer.constructor.name === 'Buffer') {
return buffer.buffer.slice(buffer.byteOffset, buffer.byteOffset + buffer.byteLength)
Expand Down Expand Up @@ -182,25 +180,37 @@ class Response extends Writable {
}
}

_write (body, encoding, callback = noop) {
if (this._writableState.destroyed) {
return
}
this.writeHead()
if (body.length === 0) callback()
const data = !encoding && typeof body === 'string'
? body
: toArrayBuffer(Buffer.from(body, encoding))
const ok = this.connection.writeBody(data, this._totalSize)
if (this.connection.aborted) return callback()
this.connection.writeOffset = this.connection.getWriteOffset()
if (!ok) {
this.connection.onWritable((offset) => {
this._write(body.slice(offset - this.connection.writeOffset), encoding, callback)
this.emit('drain')
})
} else {
callback()
_write (chunk, encoding, callback) {
try {
this.writeHead()
if (!chunk || chunk.length === 0 || chunk.byteLength === 0) {
process.nextTick(callback)
}
const data = encoding === 'buffer' ? toArrayBuffer(chunk) : chunk
const initOffset = this.connection.getWriteOffset()
const ok = this.connection.writeBody(data, this._totalSize)
if (!ok) {
this.connection.onWritable((offset) => {
try {
const ok = this.connection.writeBody(data.slice(offset - initOffset), this._totalSize)
if (ok) {
process.nextTick(callback)
}
return ok
} catch (e) {
console.log(e)
callback(e)
return true
}
})
} else {
process.nextTick(callback)
}
return true
} catch (e) {
callback(e)
super.destroy()
return false
}
}

Expand All @@ -209,11 +219,6 @@ class Response extends Writable {
this.emit('error', error)
}

_pipeEnd () {
this.connection.end()
super.end()
}

_pipeFrom (readable, contentType) {
if (this._writableState.destroyed) {
return
Expand All @@ -234,7 +239,6 @@ class Response extends Writable {
this.setHeader('Content-Type', contentType)
}
readable.on('error', this._pipeError.bind(this))
readable.on('end', this._pipeEnd.bind(this))
// In RFC these status code must not have body
if (this._status < 200 || this._status === 204 || this._status === 304) {
throw new ServerError({
Expand Down Expand Up @@ -265,10 +269,10 @@ class Response extends Writable {
this.cork(() => {
this.writeHead()
this.connection.end(data)
super.end()
super.destroy()
})
} else {
super.end()
super.destroy()
}
}

Expand Down
1 change: 1 addition & 0 deletions test/prepare/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ module.exports = function (app) {
res.writeHead()
for (let i=0; i < 10; i++) {
res.write(`${i}`)
await delay(1)
}
res.end()
})
Expand Down

0 comments on commit 225bd13

Please sign in to comment.