Skip to content

Commit

Permalink
feat(Server): support read body later
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jul 22, 2022
1 parent 2b674e0 commit 63639d3
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 76 deletions.
6 changes: 5 additions & 1 deletion benchmark/express-ws.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const express = require('express')
const Protocol = require('fast-ws-server/js/ws-protocol/fast-ws')
const Protocol = require('fast-ws-server/ws/fast-ws')
const { Readable } = require('stream')

const app = express()
Expand Down Expand Up @@ -35,6 +35,10 @@ app.get('/stream', (req, res) => {
stream.pipe(res)
})

app.post('/stream', (req, res) => {
req.pipe(res)
})

app.use('/', express.static('static'))

console.time('STARTUP')
Expand Down
8 changes: 8 additions & 0 deletions benchmark/fast-ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ app.get('/stream', (req, res) => {
stream.pipe(res)
})

app.post('/stream', (req, res) => {
req.bodyStream.pipe(res)
})

app.post('/stream/send', async (req, res) => {
res.send(await req.body)
})

app.serve('/')

console.time('STARTUP')
Expand Down
6 changes: 5 additions & 1 deletion benchmark/nanoexpress.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const nanoexpress = require('nanoexpress')
const staticServe = require('@nanoexpress/middleware-static-serve/cjs')
const Protocol = require('fast-ws-server/js/ws-protocol/fast-ws')
const Protocol = require('fast-ws-server/ws/fast-ws')

const app = nanoexpress()

Expand Down Expand Up @@ -31,6 +31,10 @@ app.get('/hello/:name', async (req, res) => {
res.end(`Hello ${name}`)
})

app.post('/stream', (req, res) => {
res.send(req.body)
})

app.use('/', staticServe('./static'))

app.listen(3000)
3 changes: 3 additions & 0 deletions benchmark/post.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
wrk.method = "POST"
wrk.body = string.rep("-TEST_STRING-", 8192)
wrk.headers["Content-Type"] = "text/plain"
142 changes: 70 additions & 72 deletions packages/server/js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ const { cache, templateEngine, maxBodySize } = require('./constants')

const methodsWithBody = ['POST', 'PUT', 'PATCH', 'OPTIONS']

const emptyBuffer = Buffer.alloc(0)

class Connection {
constructor (app, request, response, wsContext) {
this.app = app
Expand All @@ -24,11 +22,13 @@ class Connection {
this.headers[k] = v
}
})
this.url = this.request.getUrl()
this.method = this.request.getMethod().toUpperCase()
this.rawQuery = this.request.getQuery()
this._req_info = {}
this._method = null
this._body = null
this._body_stream = null
this._bodyStream = null
this._reject_data = null
this._on_aborted = []
this._on_writable = null
Expand All @@ -42,25 +42,16 @@ class Connection {
this._on_writable ? this._on_writable(offset) : true)
this.wsContext = wsContext
this._remote_address = null
this.processBodyData()
}

static create (app, request, response, wsContext = null) {
return new Connection(app, request, response, wsContext)
}

bodyDataStream () {
if (!this.response) {
throw new ServerError({ code: 'SERVER_INVALID_CONNECTION' })
}
if (!methodsWithBody.includes(this.method)) {
throw new ServerError({
code: 'SERVER_INVALID_OPERATE',
message: `The method "${this.method}" should not have body.`
})
}
if (this._body_stream !== null) {
return this._body_stream
}
processBodyData () {
if (!this.response) return
if (!methodsWithBody.includes(this.method)) return
const contentLength = this.headers['content-length']
// Verify Content-Length
if (!contentLength) {
Expand All @@ -70,68 +61,83 @@ class Connection {
} else if (this.bodyLimit && Number(contentLength) > this.bodyLimit) {
throw new ServerError({ code: 'CLIENT_LENGTH_TOO_LARGE', message: '', httpCode: 413 })
}
const length = Number(contentLength || 0)
const chunks = []
let received = 0
let isEnd = false
let error = null
let callback = null
this._body_stream = new Readable({
read (size) {
if (error) {
this.destroy(error)
} else {
if (!chunks.length && !isEnd) {
callback = () => {
if (error) {
this.destroy(error)
} else {
const chunk = chunks.shift()
this.push(chunk)
}
}
} else {
const chunk = chunks.shift()
if (!chunk && isEnd) this.push(null)
else this.push(chunk || emptyBuffer)
}
}
}
})
this._body_stream.bodyLength = length
this.bodyLength = Number(contentLength || 0)
this._buffer = []
this._dataEnd = false
this._received = 0
this._onData = null
this._dataError = null
this.onAborted(() => {
isEnd = true
error = new ServerError({ code: 'CONNECTION_ABORTED' })
if (callback) {
callback()
this._dataEnd = true
this._dataError = new ServerError({ code: 'CONNECTION_ABORTED' })
if (this._onData) {
this._onData()
}
})
this.response.onData((chunk, isLast) => {
if (isEnd) return
received += chunk.byteLength
if (length && received > length) {
isEnd = true
error = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
} else if (length && isLast && received < length) {
isEnd = true
error = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
if (this._dataEnd) return
this._received += chunk.byteLength
if (this.bodyLength && this._received > this.bodyLength) {
this._dataEnd = true
this._dataError = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
} else if (this.bodyLength && isLast && this._received < this.bodyLength) {
this._dataEnd = true
this._dataError = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
} else {
chunks.push(Buffer.from(chunk))
isEnd = isLast
// Copy buffer to avoid memory release
this._buffer.push(Buffer.from(Buffer.from(chunk)))
this._dataEnd = isLast
}
if (callback) {
callback()
if (this._onData) {
this._onData()
}
})
return this._body_stream
}

bodyDataStream () {
if (!this.response) {
throw new ServerError({ code: 'SERVER_INVALID_CONNECTION' })
}
if (!methodsWithBody.includes(this.method)) {
throw new ServerError({
code: 'SERVER_INVALID_OPERATE',
message: `The method "${this.method}" should not have body.`
})
}
if (this._bodyStream !== null) {
return this._bodyStream
}
const readData = (callback) => {
if (this._dataError) {
callback(this._dataError)
} else {
if (!this._buffer.length && !this._dataEnd) {
this._onData = () => readData(callback)
} else {
this._onData = null
const chunk = this._buffer.shift()
if (!chunk && this._dataEnd) callback(null, null)
else callback(null, Buffer.from(chunk))
}
}
}
this._bodyStream = new Readable({
read () {
readData((err, chunk) => {
if (err) this.destroy(err)
else this.push(chunk)
})
}
})
this._bodyStream.bodyLength = this.bodyLength
return this._bodyStream
}

bodyData () {
if (this._body !== null) {
return this._body
}
const type = this.headers['content-type']
if (!type) return null
const type = this.headers['content-type'] || 'application/octet-stream'
const stream = this.bodyDataStream()
this._body = new Promise((resolve, reject) => {
let data = null
Expand Down Expand Up @@ -185,14 +191,6 @@ class Connection {
)
}

get url () {
return this.getInfo('url', () => this.request.getUrl())
}

get method () {
return this.getInfo('method', () => this.request.getMethod().toUpperCase())
}

getInfo (name, valueFn) {
if (!this._req_info[name]) this._req_info[name] = valueFn()
return this._req_info[name]
Expand Down
2 changes: 1 addition & 1 deletion packages/server/js/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class Response extends Writable {
}

send (data, contentType = null) {
if (!contentType && !this._headers['content-type']) {
if (!contentType && !this._headers['content-type'] && typeof data === 'string') {
this._headers['content-type'] = data.includes('<html>') ? 'text/html' : 'text/plain'
} else if (contentType) {
this._headers['content-type'] = contentType
Expand Down
3 changes: 2 additions & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"./request": "./js/request.js",
"./response": "./js/response.js",
"./error": "./js/errors.js",
"./ws-base": "./js/ws-protocol/basic.js"
"./ws-base": "./js/ws-protocol/basic.js",
"./ws/": "./js/ws-protocol/"
},
"repository": {
"type": "git",
Expand Down
16 changes: 16 additions & 0 deletions test/cases/http-pipe-body-stream-later-1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const axios = require('axios')

module.exports = async function ({ HTTP_PORT }) {
const body = '__TEST__STRING__'.repeat(4096)

const res = await axios.post(`http://localhost:${HTTP_PORT}/stream/body-later-1`, body)
if (res.status !== 200) {
throw new Error(`Response ${res.status}`)
}
if (res.headers['content-length'] !== body.length.toString()) {
throw new Error('Content-Length mismatch')
}
if (res.data !== body) {
throw new Error('Response data mismatch')
}
}
16 changes: 16 additions & 0 deletions test/cases/http-pipe-body-stream-later-2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const axios = require('axios')

module.exports = async function ({ HTTP_PORT }) {
const body = '__TEST__STRING__'.repeat(4096)

const res = await axios.post(`http://localhost:${HTTP_PORT}/stream/body-later-2`, body)
if (res.status !== 200) {
throw new Error(`Response ${res.status}`)
}
if (res.headers['content-length'] !== body.length.toString()) {
throw new Error('Content-Length mismatch')
}
if (res.data !== body) {
throw new Error('Response data mismatch')
}
}
9 changes: 9 additions & 0 deletions test/prepare/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ module.exports = function (app) {
req.bodyStream.pipe(res)
})

app.post('/stream/body-later-1', (req, res) => {
setTimeout(() => req.bodyStream.pipe(res), 100)
})

app.post('/stream/body-later-2', (req, res) => {
const stream = req.bodyStream
setTimeout(() => stream.pipe(res), 100)
})

app.get('/stream/error', (req, res) => {
const stream = new Stream.Readable({
read: () => '',
Expand Down

0 comments on commit 63639d3

Please sign in to comment.