Skip to content

Commit

Permalink
feat(Server): support for body stream
Browse files Browse the repository at this point in the history
  • Loading branch information
hans00 committed Jul 22, 2022
1 parent 88068f1 commit 3bf1a07
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 44 deletions.
7 changes: 6 additions & 1 deletion docs/Request.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
## `body`

> Get body, return Promised.
> Get body, return `Promise<any>`.
> Only `POST`, `PUT`, `PATCH` can receive body.
> Will auto parse data.
## `bodyStream`

> Get body stream, return `Readable<Buffer>`.
> Only `POST`, `PUT`, `PATCH` can receive body.
## `url`

> Request path.
Expand Down
146 changes: 103 additions & 43 deletions packages/server/js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ const qs = require('qs')
const iconv = require('iconv-lite')
const contentType = require('content-type')
const multipart = require('multipart-formdata')
const { Readable } = require('stream')
const ServerError = require('./errors')
const utils = require('./utils')
const { cache, templateEngine, maxBodySize } = require('./constants')

const methodsWithBody = ['POST', 'PUT', 'PATCH', 'OPTIONS']

const emptyBuffer = Buffer.alloc(0)

class Connection {
constructor (app, request, response, wsContext) {
this.app = app
Expand All @@ -25,6 +28,7 @@ class Connection {
this._req_info = {}
this._method = null
this._body = null
this._body_stream = null
this._reject_data = null
this._on_aborted = []
this._on_writable = null
Expand All @@ -44,62 +48,118 @@ class Connection {
return new Connection(app, request, response, wsContext)
}

bodyData () {
bodyDataStream () {
if (!this.response) {
return null
throw new ServerError({ code: 'SERVER_INVALID_CONNECTION' })
}
if (!methodsWithBody.includes(this.method)) {
return null
}
if (this._body !== null) {
return this._body
throw new ServerError({
code: 'SERVER_INVALID_OPERATE',
message: `The method "${this.method}" should not have body.`
})
}
const _contentType = this.headers['content-type']
const _contentLength = this.headers['content-length']
// exit if no Content-Type
if (!_contentType) {
return null
if (this._body_stream !== null) {
return this._body_stream
}
const contentLength = this.headers['content-length']
// Verify Content-Length
if (!_contentLength) {
if (!contentLength) {
throw new ServerError({ code: 'CLIENT_NO_LENGTH', message: '', httpCode: 411 })
} else if (!/^[1-9]\d*$/.test(_contentLength)) {
} else if (!/^[1-9]\d*$/.test(contentLength)) {
throw new ServerError({ code: 'CLIENT_LENGTH_INVALID', message: '', httpCode: 400 })
} else if (this.bodyLimit && Number(_contentLength) > this.bodyLimit) {
} else if (this.bodyLimit && Number(contentLength) > this.bodyLimit) {
throw new ServerError({ code: 'CLIENT_LENGTH_TOO_LARGE', message: '', httpCode: 413 })
}
this._body = new Promise((resolve, reject) => {
this.onAborted(() => reject(new ServerError({ code: 'CONNECTION_ABORTED' })))
const contentLength = Number(_contentLength)
let data = null; let bodyLength = 0
this.response.onData((chunk, isLast) => {
data = data !== null ? Buffer.concat([data, Buffer.from(chunk)]) : Buffer.concat([Buffer.from(chunk)])
bodyLength += chunk.byteLength
if (bodyLength >= contentLength) {
try {
const contentData = data.slice(0, contentLength)
const content = contentType.parse(_contentType)
// In RFC, charset default is ISO-8859-1, and it equal to latin1
const charset = content.parameters.charset || 'latin1'
if (content.type.startsWith('text/')) {
resolve(iconv.decode(contentData, charset))
} else if (content.type === 'application/json') {
resolve(JSON.parse(iconv.decode(contentData, charset)))
} else if (content.type === 'application/x-www-form-urlencoded') {
resolve(qs.parse(iconv.decode(contentData, charset)))
} else if (content.type === 'multipart/form-data') {
if (!content.parameters.boundary) {
throw new Error('NO_BOUNDARY')
const length = Number(contentLength || 0)
const chunks = []
let received = 0
let isEnd = false
let error = null
let callback = null
this._body_stream = new Readable({
read (size) {
if (error) {
this.destroy(error)
} else {
if (!chunks.length && !isEnd) {
callback = () => {
if (error) {
this.destroy(error)
} else {
const chunk = chunks.shift()
this.push(chunk)
}
resolve(multipart.parse(contentData, content.parameters.boundary))
} else {
resolve(contentData)
}
} catch (e) {
reject(new ServerError({ code: 'SERVER_BODY_PARSE', originError: e, httpCode: 400 }))
} else {
const chunk = chunks.shift()
if (!chunk && isEnd) this.push(null)
else this.push(chunk || emptyBuffer)
}
}
}
})
this._body_stream.bodyLength = length
this.onAborted(() => {
isEnd = true
error = new ServerError({ code: 'CONNECTION_ABORTED' })
if (callback) {
callback()
}
})
this.response.onData((chunk, isLast) => {
if (isEnd) return
received += chunk.byteLength
if (length && received > length) {
isEnd = true
error = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
} else if (length && isLast && received < length) {
isEnd = true
error = new ServerError({ code: 'CLIENT_BAD_REQUEST', httpCode: 400 })
} else {
chunks.push(Buffer.from(chunk))
isEnd = isLast
}
if (callback) {
callback()
}
})
return this._body_stream
}

bodyData () {
if (this._body !== null) {
return this._body
}
const type = this.headers['content-type']
if (!type) return null
const stream = this.bodyDataStream()
this._body = new Promise((resolve, reject) => {
let data = null
stream.on('error', reject)
stream.on('data', (chunk) => {
data = data !== null ? Buffer.concat([data, chunk]) : chunk
})
stream.on('end', () => {
try {
const content = contentType.parse(type)
// In RFC, charset default is ISO-8859-1, and it equal to latin1
const charset = content.parameters.charset || 'latin1'
if (content.type.startsWith('text/')) {
resolve(iconv.decode(data, charset))
} else if (content.type === 'application/json') {
resolve(JSON.parse(iconv.decode(data, charset)))
} else if (content.type === 'application/x-www-form-urlencoded') {
resolve(qs.parse(iconv.decode(data, charset)))
} else if (content.type === 'multipart/form-data') {
if (!content.parameters.boundary) {
throw new Error('NO_BOUNDARY')
}
resolve(multipart.parse(data, content.parameters.boundary))
} else {
resolve(data)
}
} else if (isLast) {
reject(new ServerError({ code: 'SERVER_BODY_LENGTH', httpCode: 400 }))
} catch (e) {
reject(new ServerError({ code: 'SERVER_BODY_PARSE', originError: e, httpCode: 400 }))
}
})
})
Expand Down
4 changes: 4 additions & 0 deletions packages/server/js/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class Request {
return this.connection.bodyData()
}

get bodyStream () {
return this.connection.bodyDataStream()
}

get hostname () {
const headers = this.connection.headers
return headers.host || headers['x-forwarded-host'] || os.hostname()
Expand Down
2 changes: 2 additions & 0 deletions packages/server/js/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ class Response extends Writable {
if (!contentType) {
contentType = mime.lookup(readable.path) || 'application/octet-stream'
}
} else if (readable.bodyLength) { // Known size body
this._totalSize = readable.bodyLength
}
if (!this.getHeader('content-type') && contentType) {
this.setHeader('Content-Type', contentType)
Expand Down
16 changes: 16 additions & 0 deletions test/cases/http-pipe-body-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const axios = require('axios')

module.exports = async function ({ HTTP_PORT }) {
const body = '__TEST__STRING__'.repeat(4096)

const res = await axios.post(`http://localhost:${HTTP_PORT}/stream/body`, body)
if (res.status !== 200) {
throw new Error(`Response ${res.status}`)
}
if (res.headers['content-length'] !== body.length.toString()) {
throw new Error('Content-Length mismatch')
}
if (res.data !== body) {
throw new Error('Response data mismatch')
}
}
4 changes: 4 additions & 0 deletions test/prepare/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ module.exports = function (app) {
Stream.Readable.from(gen()).pipe(res)
})

app.post('/stream/body', (req, res) => {
req.bodyStream.pipe(res)
})

app.get('/stream/error', (req, res) => {
const stream = new Stream.Readable({
read: () => '',
Expand Down

0 comments on commit 3bf1a07

Please sign in to comment.