Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
feat(mitm): support "mitm" behavior
Browse files Browse the repository at this point in the history
Supplying `{mitm: true}` to `connect()` in lieu of URL (or path) will
assume [mitm](https://npm.im/mitm) is in use.  A dummy port is handed
to `net.createConnection()`.

- swaps `pify` for `promwrap`
- improved test harnesses
- added pending tests where more coverage is needed
  • Loading branch information
boneskull committed Feb 2, 2018
1 parent 407cdf2 commit 1ee3fac
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 92 deletions.
11 changes: 6 additions & 5 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

const MQTT = require('mqtt');
const pify = require('pify');
const promisify = require('promwrap');
const net = require('net');
const {EventEmitter2} = require('eventemitter2');
const decoders = require('./decoders');
Expand All @@ -26,7 +26,7 @@ const asyncMethodNames = ['publish', 'subscribe', 'unsubscribe', 'end'];
*/
const toadpatch = (client, baseOpts = {}) => {
const asyncMethods = asyncMethodNames.reduce(
(acc, name) => Object.assign(acc, {[name]: pify(client[name])}),
(acc, name) => Object.assign(acc, {[name]: promisify(client[name])}),
{}
);

Expand Down Expand Up @@ -147,7 +147,7 @@ const toadpatch = (client, baseOpts = {}) => {

const normalizeOptions = (opts = {}, defaults = DEFAULT_OPTS) => {
[['decoder', decoders], ['encoder', encoders]].forEach(([prop, builtins]) => {
if (opts.hasOwnProperty(prop)) {
if (prop in opts) {
if (typeof opts[prop] === 'string') {
const value = builtins[opts[prop]];
if (!value) {
Expand Down Expand Up @@ -181,9 +181,10 @@ exports.connect = async (url, opts = {}) => {
opts = normalizeOptions(url);
args = [opts];
}
const path = opts.mitm ? 1833 : opts.path;
return new Promise((resolve, reject) => {
(opts.path
? MQTT.MqttClient(() => net.createConnection(opts.path), {
(path
? MQTT.MqttClient(() => net.createConnection(path), {
resubscribe: false
})
: MQTT.connect(...args)
Expand Down
37 changes: 36 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"dependencies": {
"eventemitter2": "^4.1.2",
"mqtt": "^2.15.1",
"pify": "^3.0.0"
"promwrap": "^2.1.0"
},
"files": [
"lib"
Expand All @@ -50,11 +50,13 @@
"get-port": "^3.2.0",
"husky": "^0.14.3",
"lint-staged": "^4.2.3",
"mitm": "^1.3.3",
"mocha": "^4.0.1",
"mqtt-connection": "^3.1.0",
"nyc": "^11.2.1",
"prettier-eslint-cli": "^4.4.0",
"semantic-release": "^8.0.3",
"stoppable": "^1.0.5",
"unexpected": "^10.36.0"
},
"engines": {
Expand Down
70 changes: 43 additions & 27 deletions test/harness/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,48 @@
const _ = require('lodash');
const {Server} = require('net');
const MqttConnection = require('mqtt-connection');
const MITM = require('mitm');
const promisify = require('promwrap');
const stoppable = require('stoppable');

class Broker extends Server {
class BaseServer extends Server {
constructor(listener) {
super();
this.on('connection', stream => {
this.emit('client', new MqttConnection(stream));
});

if (listener) {
this.on('client', listener);
}
}
}

exports.Broker = Broker;
class Broker extends BaseServer {
constructor(listener) {
super(listener);

this.on('connection', sock => {
this.emit('client', new MqttConnection(sock));
});
}
}

class MITMBroker extends BaseServer {
listen(ignored, done) {
this.mitm = MITM();
this.mitm.on('connection', sock => {
this.emit('client', new MqttConnection(sock));
});
process.nextTick(done);
}

exports.createBroker = async (port, transformers = {}) => {
close(done) {
this.mitm.disable();
process.nextTick(done);
}
}

exports.Broker = BaseServer;

exports.createBroker = async ({port, path, mitm, transformers = {}} = {}) => {
transformers = _.defaults(transformers, {
connack(...args) {
return {returnCode: 0};
Expand All @@ -35,7 +61,8 @@ exports.createBroker = async (port, transformers = {}) => {
pubrec: _.identity,
publish: _.identity
});
const broker = new Broker(client => {

const listener = client => {
client
.on('connect', (...args) => {
client.connack(transformers.connack(...args));
Expand Down Expand Up @@ -70,28 +97,17 @@ exports.createBroker = async (port, transformers = {}) => {
break;
}
});
})
.on('close', () => {
client.destroy();
})
.on('error', () => {
client.destroy();
})
.on('timeout', () => {
client.destroy();
})
.on('disconnect', () => {
client.destroy();
});
});
};

const broker = stoppable(new (mitm ? MITMBroker : Broker)(listener), 0);
broker.transformers = transformers;
broker.port = port;
return new Promise((resolve, reject) => {
broker.listen(port, err => {
if (err) {
return reject(err);
}
resolve(broker);
});
broker.path = path;
const promisifiedBroker = promisify(broker, {
exclude: ['unref', 'address', 'ref']
});

await promisifiedBroker.listen(port || path);
return promisifiedBroker;
};
Loading

0 comments on commit 1ee3fac

Please sign in to comment.