diff --git a/lib/index.js b/lib/index.js index ed10fd6..067a6e2 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,7 +1,7 @@ 'use strict'; const MQTT = require('mqtt'); -const pify = require('pify'); +const promisify = require('promwrap'); const net = require('net'); const {EventEmitter2} = require('eventemitter2'); const decoders = require('./decoders'); @@ -26,7 +26,7 @@ const asyncMethodNames = ['publish', 'subscribe', 'unsubscribe', 'end']; */ const toadpatch = (client, baseOpts = {}) => { const asyncMethods = asyncMethodNames.reduce( - (acc, name) => Object.assign(acc, {[name]: pify(client[name])}), + (acc, name) => Object.assign(acc, {[name]: promisify(client[name])}), {} ); @@ -147,7 +147,7 @@ const toadpatch = (client, baseOpts = {}) => { const normalizeOptions = (opts = {}, defaults = DEFAULT_OPTS) => { [['decoder', decoders], ['encoder', encoders]].forEach(([prop, builtins]) => { - if (opts.hasOwnProperty(prop)) { + if (prop in opts) { if (typeof opts[prop] === 'string') { const value = builtins[opts[prop]]; if (!value) { @@ -181,9 +181,10 @@ exports.connect = async (url, opts = {}) => { opts = normalizeOptions(url); args = [opts]; } + const path = opts.mitm ? 1833 : opts.path; return new Promise((resolve, reject) => { - (opts.path - ? MQTT.MqttClient(() => net.createConnection(opts.path), { + (path + ? MQTT.MqttClient(() => net.createConnection(path), { resubscribe: false }) : MQTT.connect(...args) diff --git a/package-lock.json b/package-lock.json index 81688d5..9bd0a3c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1172,6 +1172,11 @@ "is-arrayish": "0.2.1" } }, + "es6-promisify": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/es6-promisify/-/es6-promisify-6.0.0.tgz", + "integrity": "sha512-8Tbqjrb8lC85dd81haajYwuRmiU2rkqNAFnlvQOJeeKqdUloIlI+JcUqeJruV4rCm5Y7oNU7jfs2FbmxhRR/2g==" + }, "escape-string-regexp": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/escape-string-regexp/-/escape-string-regexp-1.0.5.tgz", @@ -3238,6 +3243,15 @@ "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=" }, + "mitm": { + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/mitm/-/mitm-1.3.3.tgz", + "integrity": "sha512-b2+h4QIvW0gS8xluuXhgG6JXQYjuoxXVpACudXewK4c+7giwigN4UJ2hhWKQUwaT+jh9mQA5HYNfJ6t89Tq/uA==", + "dev": true, + "requires": { + "underscore": "1.5.2" + } + }, "mkdirp": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", @@ -5479,7 +5493,8 @@ "pify": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/pify/-/pify-3.0.0.tgz", - "integrity": "sha1-5aSs0sEB/fPZpNB/DbxNtJ3SgXY=" + "integrity": "sha1-5aSs0sEB/fPZpNB/DbxNtJ3SgXY=", + "dev": true }, "pinkie": { "version": "2.0.4", @@ -5742,6 +5757,14 @@ "integrity": "sha1-ihvjZr+Pwj2yvSPxDG/pILQ4nR8=", "dev": true }, + "promwrap": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/promwrap/-/promwrap-2.1.0.tgz", + "integrity": "sha512-/nDGWjctAFTroJUidYk1x2kxXrHFXdRVi3dw6O0DpFwYSA/WzZMZb+DxHW0wNQQJSqWu1Y8rtABeFEhJL57PVg==", + "requires": { + "es6-promisify": "6.0.0" + } + }, "proto-list": { "version": "1.2.4", "resolved": "https://registry.npmjs.org/proto-list/-/proto-list-1.2.4.tgz", @@ -6240,6 +6263,12 @@ "integrity": "sha1-15fhtVHKemOd7AI33G60u5vhfTU=", "dev": true }, + "stoppable": { + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/stoppable/-/stoppable-1.0.5.tgz", + "integrity": "sha512-qIu1V3hO/d0oS6ARFnsd+B8TvyP4LFjX1v3oFDL4xp3Zen/J7uY9h0WR+ptGzymOPJkk/31ZuHPW4Ml85i0LLw==", + "dev": true + }, "stream-consume": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/stream-consume/-/stream-consume-0.1.0.tgz", @@ -6785,6 +6814,12 @@ "resolved": "https://registry.npmjs.org/unc-path-regex/-/unc-path-regex-0.1.2.tgz", "integrity": "sha1-5z3T17DXxe2G+6xrCufYxqadUPo=" }, + "underscore": { + "version": "1.5.2", + "resolved": "https://registry.npmjs.org/underscore/-/underscore-1.5.2.tgz", + "integrity": "sha1-EzXF5PXm0zu7SwBrqMhqAPVW3gg=", + "dev": true + }, "underscore.string": { "version": "2.2.1", "resolved": "https://registry.npmjs.org/underscore.string/-/underscore.string-2.2.1.tgz", diff --git a/package.json b/package.json index 1e69f29..911b9e6 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "dependencies": { "eventemitter2": "^4.1.2", "mqtt": "^2.15.1", - "pify": "^3.0.0" + "promwrap": "^2.1.0" }, "files": [ "lib" @@ -50,11 +50,13 @@ "get-port": "^3.2.0", "husky": "^0.14.3", "lint-staged": "^4.2.3", + "mitm": "^1.3.3", "mocha": "^4.0.1", "mqtt-connection": "^3.1.0", "nyc": "^11.2.1", "prettier-eslint-cli": "^4.4.0", "semantic-release": "^8.0.3", + "stoppable": "^1.0.5", "unexpected": "^10.36.0" }, "engines": { diff --git a/test/harness/index.js b/test/harness/index.js index 25b53f5..dc4ec38 100644 --- a/test/harness/index.js +++ b/test/harness/index.js @@ -3,22 +3,48 @@ const _ = require('lodash'); const {Server} = require('net'); const MqttConnection = require('mqtt-connection'); +const MITM = require('mitm'); +const promisify = require('promwrap'); +const stoppable = require('stoppable'); -class Broker extends Server { +class BaseServer extends Server { constructor(listener) { super(); - this.on('connection', stream => { - this.emit('client', new MqttConnection(stream)); - }); + if (listener) { this.on('client', listener); } } } -exports.Broker = Broker; +class Broker extends BaseServer { + constructor(listener) { + super(listener); + + this.on('connection', sock => { + this.emit('client', new MqttConnection(sock)); + }); + } +} + +class MITMBroker extends BaseServer { + listen(ignored, done) { + this.mitm = MITM(); + this.mitm.on('connection', sock => { + this.emit('client', new MqttConnection(sock)); + }); + process.nextTick(done); + } -exports.createBroker = async (port, transformers = {}) => { + close(done) { + this.mitm.disable(); + process.nextTick(done); + } +} + +exports.Broker = BaseServer; + +exports.createBroker = async ({port, path, mitm, transformers = {}} = {}) => { transformers = _.defaults(transformers, { connack(...args) { return {returnCode: 0}; @@ -35,7 +61,8 @@ exports.createBroker = async (port, transformers = {}) => { pubrec: _.identity, publish: _.identity }); - const broker = new Broker(client => { + + const listener = client => { client .on('connect', (...args) => { client.connack(transformers.connack(...args)); @@ -70,28 +97,17 @@ exports.createBroker = async (port, transformers = {}) => { break; } }); - }) - .on('close', () => { - client.destroy(); - }) - .on('error', () => { - client.destroy(); - }) - .on('timeout', () => { - client.destroy(); - }) - .on('disconnect', () => { - client.destroy(); }); - }); + }; + + const broker = stoppable(new (mitm ? MITMBroker : Broker)(listener), 0); broker.transformers = transformers; broker.port = port; - return new Promise((resolve, reject) => { - broker.listen(port, err => { - if (err) { - return reject(err); - } - resolve(broker); - }); + broker.path = path; + const promisifiedBroker = promisify(broker, { + exclude: ['unref', 'address', 'ref'] }); + + await promisifiedBroker.listen(port || path); + return promisifiedBroker; }; diff --git a/test/mqttletoad.spec.js b/test/mqttletoad.spec.js index b27da27..38b1356 100644 --- a/test/mqttletoad.spec.js +++ b/test/mqttletoad.spec.js @@ -11,27 +11,52 @@ const path = require('path'); describe('mqttletoad', function() { let broker; + let client; let port; + afterEach(async function() { + if (client) { + try { + await client.end(); + } catch (ignored) {} + } + if (broker) { + try { + await broker.stop(); + } catch (ignored) {} + } + client = null; + broker = null; + }); + describe('method', function() { describe('connect()', function() { - describe('IPC', function() { - let client; + describe('MITM', function() { + this.slow(200); beforeEach(async function() { - broker = await createBroker( - path.join(os.tmpdir(), `mqttletoad-${Date.now()}`) - ); + broker = await createBroker({mitm: true}); }); - afterEach(function(done) { - client.end().then(() => { - broker.close(done); + it('should connect without port nor path', async function() { + client = await expect(connect({mitm: true}), 'to be fulfilled'); + }); + }); + + describe('IPC', function() { + this.slow(200); + + beforeEach(async function() { + broker = await createBroker({ + path: path.join(os.tmpdir(), `mqttletoad-${Date.now()}`) }); }); it('should allow connection via a path', async function() { - client = await connect({path: broker.port}); + client = await expect( + connect({path: broker.path}), + 'to be fulfilled' + ); }); }); @@ -43,27 +68,20 @@ describe('mqttletoad', function() { }); describe('when given a valid connection object', function() { - let client; - beforeEach(async function() { port = await getPort(); - broker = await createBroker(port); + broker = await createBroker({port}); }); it('should fulfill', async function() { - const promise = connect({ - host: 'localhost', - port, - protocol: 'mqtt' - }); - client = await expect(promise, 'to be fulfilled'); - return client.end(); - }); - - afterEach(function(done) { - client.end().then(() => { - broker.close(done); - }); + client = await expect( + connect({ + host: 'localhost', + port, + protocol: 'mqtt' + }), + 'to be fulfilled' + ); }); }); @@ -72,13 +90,10 @@ describe('mqttletoad', function() { beforeEach(async function() { port = await getPort(); - broker = await createBroker(port); - promise = connect(`mqtt://localhost:${port}`); - }); - - afterEach(function(done) { - promise.then(client => client.end()).then(() => { - broker.close(done); + broker = await createBroker({port}); + promise = connect(`mqtt://localhost:${port}`).then(c => { + client = c; + return c; }); }); @@ -100,54 +115,51 @@ describe('mqttletoad', function() { }); describe('upon subsequent connections', function() { - let client; - beforeEach(async function() { port = await getPort(); - broker = await createBroker(port); - client = await connect(`mqtt://localhost:${port}`); + broker = await createBroker({port}); + client = await connect(`mqtt://localhost:${port}`, { + // reduce this so we don't have to wait 1000ms for the + // reconnection attempt + reconnectPeriod: 20 + }); broker.transformers.connack = _ => ({ returnCode: 0, sessionPresent: true }); + // kills the underlying stream, which the client + // interprets as a remote disconnection client.stream.end(); // at this point, it should automatically reconnect }); - afterEach(function(done) { - client.end().then(() => { - broker.close(done); - }); - }); - - it('should update `sessionPresent` accordingly', function(done) { - client.once('connect', () => { - expect(client.sessionPresent, 'to be', true); - done(); - }); + it('should update `sessionPresent` accordingly', async function() { + await expect( + new Promise((resolve, reject) => { + client.once('connect', () => { + resolve(client); + }); + }), + 'to be fulfilled with value satisfying', + {sessionPresent: true} + ); }); }); }); }); }); - describe('mqttletoad client', function() { - let client; - let port; - let broker; + describe('decoders & encoders', function() { + it('should handle custom encoders and decoders'); + }); + describe('MQTT client behavior', function() { beforeEach(async function() { port = await getPort(); - broker = await createBroker(port); + broker = await createBroker({port}); client = await connect(`mqtt://localhost:${port}`); }); - afterEach(function(done) { - client.end().then(() => { - broker.close(done); - }); - }); - describe('publish()', function() { describe('encoder', function() { describe('when passed an unknown encoder', async function() { @@ -218,6 +230,10 @@ describe('mqttletoad', function() { }); describe('subscribe()', function() { + describe('error recovery', function() { + it('should remove event listener if subscription fails'); + }); + describe('invalid parameters', function() { describe('no parameters', function() { it('should reject', async function() {