Skip to content

Commit

Permalink
BREAKING: drop old Edge, drop Node 10 (server only), remove upgradeRe…
Browse files Browse the repository at this point in the history
…q support

BREAKING: Drop old Edge
- Non Chromium versions of Edge are no longer supported

BREAKING: Drop Node 10 server support
- If you use require('simple-websocket/server') then we require Node 12 minmum for ES class property syntax. If you need earlier Node support, then continue relying on the last major version.

BREAKING: Remove upgradeReq support
- upgradeReq was removed from ws in an earlier version. See websockets/ws#1099 for how to re-add it back if it's needed

Server: Pass options through to SimpleWebSocket constructor
- For example, a server can be initialized with server = new Server({ port, encoding: 'utf8' }) to make each connection stream use utf8 encoding
  • Loading branch information
feross committed May 17, 2020
1 parent 19c0be9 commit ed7153e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 90 deletions.
117 changes: 57 additions & 60 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* global WebSocket, DOMException */
/* global WebSocket */

const debug = require('debug')('simple-websocket')
const randombytes = require('randombytes')
Expand Down Expand Up @@ -55,7 +55,10 @@ class Socket extends stream.Duplex {
try {
if (typeof ws === 'function') {
// `ws` package accepts options
this._ws = new _WebSocket(opts.url, opts)
this._ws = new _WebSocket(opts.url, null, {
...opts,
encoding: undefined // encoding option breaks ws internals
})
} else {
this._ws = new _WebSocket(opts.url)
}
Expand All @@ -66,23 +69,19 @@ class Socket extends stream.Duplex {
}

this._ws.binaryType = 'arraybuffer'
this._ws.onopen = () => {
this._onOpen()
}
this._ws.onmessage = event => {
this._onMessage(event)
}
this._ws.onclose = () => {
this._onClose()
}
this._ws.onerror = () => {
this.destroy(new Error('connection error to ' + this.url))
}

this._onFinishBound = () => {
this._onFinish()
if (opts.socket && this.connected) {
queueMicrotask(() => this._handleOpen())
} else {
this._ws.onopen = () => this._handleOpen()
}
this.once('finish', this._onFinishBound)

this._ws.onmessage = event => this._handleMessage(event)
this._ws.onclose = () => this._handleClose()
this._ws.onerror = err => this._handleError(err)

this._handleFinishBound = () => this._handleFinish()
this.once('finish', this._handleFinishBound)
}

/**
Expand Down Expand Up @@ -117,8 +116,10 @@ class Socket extends stream.Duplex {
this._chunk = null
this._cb = null

if (this._onFinishBound) this.removeListener('finish', this._onFinishBound)
this._onFinishBound = null
if (this._handleFinishBound) {
this.removeListener('finish', this._handleFinishBound)
}
this._handleFinishBound = null

if (this._ws) {
const ws = this._ws
Expand All @@ -142,15 +143,7 @@ class Socket extends stream.Duplex {
}
this._ws = null

if (err) {
if (typeof DOMException !== 'undefined' && err instanceof DOMException) {
// Convert Edge DOMException object to Error object
const code = err.code
err = new Error(err.message)
err.code = code
}
this.emit('error', err)
}
if (err) this.emit('error', err)
this.emit('close')
cb()
}
Expand Down Expand Up @@ -179,32 +172,7 @@ class Socket extends stream.Duplex {
}
}

// When stream finishes writing, close socket. Half open connections are not
// supported.
_onFinish () {
if (this.destroyed) return

// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
const destroySoon = () => {
setTimeout(() => this.destroy(), 1000)
}

if (this.connected) {
destroySoon()
} else {
this.once('connect', destroySoon)
}
}

_onMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
this.push(data)
}

_onOpen () {
_handleOpen () {
if (this.connected || this.destroyed) return
this.connected = true

Expand Down Expand Up @@ -233,6 +201,41 @@ class Socket extends stream.Duplex {
this.emit('connect')
}

_handleMessage (event) {
if (this.destroyed) return
let data = event.data
if (data instanceof ArrayBuffer) data = Buffer.from(data)
this.push(data)
}

_handleClose () {
if (this.destroyed) return
this._debug('on close')
this.destroy()
}

_handleError (err) {
this.destroy(new Error(`Error connecting to ${this.url} (${err})`))
}

// When stream finishes writing, close socket. Half open connections are not
// supported.
_handleFinish () {
if (this.destroyed) return

// Wait a bit before destroying so the socket flushes.
// TODO: is there a more reliable way to accomplish this?
const destroySoon = () => {
setTimeout(() => this.destroy(), 1000)
}

if (this.connected) {
destroySoon()
} else {
this.once('connect', destroySoon)
}
}

_onInterval () {
if (!this._cb || !this._ws || this._ws.bufferedAmount > MAX_BUFFERED_AMOUNT) {
return
Expand All @@ -243,12 +246,6 @@ class Socket extends stream.Duplex {
cb(null)
}

_onClose () {
if (this.destroyed) return
this._debug('on close')
this.destroy()
}

_debug () {
const args = [].slice.call(arguments)
args[0] = '[' + this._id + '] ' + args[0]
Expand Down
4 changes: 4 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},
"devDependencies": {
"airtap": "^3.0.0",
"babel-eslint": "^10.1.0",
"babel-minify": "^0.5.1",
"browserify": "^16.1.0",
"prettier-bytes": "^1.0.3",
Expand Down Expand Up @@ -51,5 +52,8 @@
"test-browser": "airtap -- test/*.js",
"test-browser-local": "airtap --local -- test/*.js",
"test-node": "tape test/*.js test/node/*.js"
},
"standard": {
"parser": "babel-eslint"
}
}
55 changes: 26 additions & 29 deletions server.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,54 @@ const WebSocketServer = require('ws').Server

class SocketServer extends events.EventEmitter {
constructor (opts) {
opts = Object.assign({
clientTracking: false,
perMessageDeflate: false
}, opts)

super()

this.destroyed = false

this._server = new WebSocketServer(opts)

this._onListeningBound = () => this._onListening()
this._server.on('listening', this._onListeningBound)
this.opts = {
clientTracking: false,
perMessageDeflate: false,
...opts
}

this._onConnectionBound = conn => this._onConnection(conn)
this._server.on('connection', this._onConnectionBound)
this.destroyed = false

this._onErrorBound = err => this._onError(err)
this._server.once('error', this._onErrorBound)
this._server = new WebSocketServer(this.opts)
this._server.on('listening', this._handleListening)
this._server.on('connection', this._handleConnection)
this._server.once('error', this._handleError)
}

address () {
return this._server.address()
}

close (cb) {
if (this.destroyed) return cb(new Error('server is closed'))
if (this.destroyed) {
if (cb) cb(new Error('server is closed'))
return
}
this.destroyed = true

if (cb) this.once('close', cb)

this._server.removeListener('listening', this._onListeningBound)
this._server.removeListener('connection', this._onConnectionBound)
this._server.removeListener('error', this._onErrorBound)
this._server.removeListener('listening', this._handleListening)
this._server.removeListener('connection', this._handleConnection)
this._server.removeListener('error', this._handleError)
this._server.on('error', () => {}) // suppress future errors

this._server.close(() => this.emit('close'))
this._server = null
}

_onListening () {
_handleListening = () => {
this.emit('listening')
}

_onConnection (conn) {
const socket = new Socket({ socket: conn })
socket._onOpen()
socket.upgradeReq = conn.upgradeReq
this.emit('connection', socket)
this.once('close', () => {
socket.upgradeReq = null
})
_handleConnection = (conn, req) => {
const socket = new Socket({ ...this.opts, socket: conn })
this.emit('connection', socket, req)
}

_onError (err) {
_handleError = (err) => {
this.emit('error', err)
this.close()
}
Expand Down
30 changes: 29 additions & 1 deletion test/node/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,53 @@ var Server = require('../../server')
var test = require('tape')

test('socket server', function (t) {
t.plan(3)
t.plan(5)

var port = 6789
var server = new Server({ port })

server.on('connection', function (socket) {
t.equal(typeof socket.read, 'function') // stream function is present
socket.on('data', function (data) {
t.ok(Buffer.isBuffer(data), 'type is buffer')
t.equal(data.toString(), 'ping')
socket.write('pong')
})
})

var client = new Socket('ws://localhost:' + port)
client.on('data', function (data) {
t.ok(Buffer.isBuffer(data), 'type is buffer')
t.equal(data.toString(), 'pong')

server.close()
client.destroy()
})
client.write('ping')
})

test('socket server, with custom encoding', function (t) {
t.plan(5)

var port = 6789
var server = new Server({ port, encoding: 'utf8' })

server.on('connection', function (socket) {
t.equal(typeof socket.read, 'function') // stream function is present
socket.on('data', function (data) {
t.equal(typeof data, 'string', 'type is string')
t.equal(data, 'ping')
socket.write('pong')
})
})

var client = new Socket({ url: 'ws://localhost:' + port, encoding: 'utf8' })
client.on('data', function (data) {
t.equal(typeof data, 'string', 'type is string')
t.equal(data, 'pong')

server.close()
client.destroy()
})
client.write('ping')
})

0 comments on commit ed7153e

Please sign in to comment.