Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
feat(ipc): add support for IPC connections
Browse files Browse the repository at this point in the history
  • Loading branch information
boneskull committed Feb 2, 2018
1 parent 119e0e9 commit 176f390
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 75 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ The following functions are promisified:
- `MqttClient#unsubscribe`
- `MqttClient#end`

### IPC Support

`mqttletoad` supports connecting to an MQTT broker running on a named pipe.

## Install

**Node.js v7.0.0 or greater required**.
Expand Down Expand Up @@ -201,6 +205,9 @@ const myfunc = async () => {

// disconnect
await client.end();

// IPC support (mqtt only; not ws)
const client = await toad.connect({path: '/path/to/my/named/pipe'});
}
```

Expand Down
18 changes: 11 additions & 7 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const MQTT = require('mqtt');
const pify = require('pify');
const net = require('net');
const {EventEmitter2} = require('eventemitter2');
const decoders = require('./decoders');
const encoders = require('./encoders');
Expand Down Expand Up @@ -177,17 +178,20 @@ exports.connect = async (url, opts = {}) => {
opts = normalizeOptions(opts);
args = [url, opts];
} else {
url = normalizeOptions(url);
args = [url];
opts = normalizeOptions(url);
args = [opts];
}
return new Promise((resolve, reject) => {
MQTT.connect(...args)
(opts.path
? MQTT.MqttClient(() => net.createConnection(opts.path))
: MQTT.connect(...args)
)
.on('connect', function(connack) {
/**
* If `false`, this is a clean session
* @public
* @memberOf client
*/
* If `false`, this is a clean session
* @public
* @memberOf client
*/
this.sessionPresent = Boolean(connack.sessionPresent);
})
.once('error', reject)
Expand Down
1 change: 1 addition & 0 deletions test/harness/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ exports.createBroker = async (port, transformers = {}) => {
});
});
broker.transformers = transformers;
broker.port = port;
return new Promise((resolve, reject) => {
broker.listen(port, err => {
if (err) {
Expand Down
166 changes: 98 additions & 68 deletions test/mqttletoad.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,96 +6,126 @@ const {MqttClient} = require('mqtt');
const {connect} = require('..');
const {createBroker} = require('./harness');
const getPort = require('get-port');
const os = require('os');
const path = require('path');

describe('mqttletoad', function() {
let broker;
let port;

describe('connect()', function() {
describe('when given no arguments', function() {
it('should reject', async function() {
return expect(connect(), 'to be rejected with', /invalid/i);
});
});
describe('method', function() {
describe('connect()', function() {
describe('IPC', function() {
let client;

describe('when given a valid connection object', function() {
let client;
beforeEach(async function() {
broker = await createBroker(
path.join(os.tmpdir(), `mqttletoad-${Date.now()}`)
);
});

beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
});
afterEach(function(done) {
client.end().then(() => {
broker.close(done);
});
});

it('should fulfill', async function() {
const promise = connect({host: 'localhost', port, protocol: 'mqtt'});
client = await expect(promise, 'to be fulfilled');
return client.end();
it('should allow connection via a path', async function() {
client = await connect({path: broker.port});
});
});

afterEach(function(done) {
client.end().then(() => {
broker.close(done);
describe('TCP', function() {
describe('when given no arguments', function() {
it('should reject', async function() {
return expect(connect(), 'to be rejected with', /invalid/i);
});
});
});
});

describe('upon first connection', function() {
let promise;
describe('when given a valid connection object', function() {
let client;

beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
promise = connect(`mqtt://localhost:${port}`);
});
beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
});

it('should fulfill', async function() {
const promise = connect({
host: 'localhost',
port,
protocol: 'mqtt'
});
client = await expect(promise, 'to be fulfilled');
return client.end();
});

afterEach(function(done) {
promise.then(client => client.end()).then(() => {
broker.close(done);
afterEach(function(done) {
client.end().then(() => {
broker.close(done);
});
});
});
});

it('should resolve with the wrapped MqttClient once connected', async function() {
return expect(
promise,
'when fulfilled',
expect.it('to be a', MqttClient)
);
});
describe('upon first connection', function() {
let promise;

it('should assign `sessionPresent` property', async function() {
return expect(
promise,
'when fulfilled',
expect.it('to have property', 'sessionPresent', false)
);
});
});
beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
promise = connect(`mqtt://localhost:${port}`);
});

describe('upon subsequent connections', function() {
let client;
afterEach(function(done) {
promise.then(client => client.end()).then(() => {
broker.close(done);
});
});

beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
client = await connect(`mqtt://localhost:${port}`);
broker.transformers.connack = _ => ({
returnCode: 0,
sessionPresent: true
});
client.stream.end();
// at this point, it should automatically reconnect
});
it('should resolve with the wrapped MqttClient once connected', async function() {
return expect(
promise,
'when fulfilled',
expect.it('to be a', MqttClient)
);
});

afterEach(function(done) {
client.end().then(() => {
broker.close(done);
it('should assign `sessionPresent` property', async function() {
return expect(
promise,
'when fulfilled',
expect.it('to have property', 'sessionPresent', false)
);
});
});
});

it('should update `sessionPresent` accordingly', function(done) {
client.once('connect', () => {
expect(client.sessionPresent, 'to be', true);
done();
describe('upon subsequent connections', function() {
let client;

beforeEach(async function() {
port = await getPort();
broker = await createBroker(port);
client = await connect(`mqtt://localhost:${port}`);
broker.transformers.connack = _ => ({
returnCode: 0,
sessionPresent: true
});
client.stream.end();
// at this point, it should automatically reconnect
});

afterEach(function(done) {
client.end().then(() => {
broker.close(done);
});
});

it('should update `sessionPresent` accordingly', function(done) {
client.once('connect', () => {
expect(client.sessionPresent, 'to be', true);
done();
});
});
});
});
});
Expand Down

0 comments on commit 176f390

Please sign in to comment.