Skip to content

Commit efb94bd

Browse files
author
Immanuel Pelzer
committed
Add AMQP based pub/sub
1 parent f7cbe76 commit efb94bd

32 files changed

+1553
-19
lines changed

.editorconfig

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
root = true
2+
3+
[*]
4+
indent_style = space
5+
indent_size = 2
6+
charset = utf-8
7+
trim_trailing_whitespace = true
8+
insert_final_newline = true

.gitignore

+24-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,27 @@
1+
.DS_Store
12
node_modules
2-
DS_Store
3-
.nyc_output
3+
/dist
44
.npmrc
5+
6+
# local env files
7+
.env.local
8+
.env.*.local
9+
10+
# Log files
11+
npm-debug.log*
12+
yarn-debug.log*
13+
yarn-error.log*
14+
15+
# Editor directories and files
16+
.idea
17+
.vscode
18+
*.suo
19+
*.ntvs*
20+
*.njsproj
21+
*.sln
22+
*.sw*
23+
.nyc_output
524
results.xml
6-
.nyc_output/
7-
.DS_Store
25+
26+
# npm packages
27+
*.tgz

.gitlab-ci.yml

+8
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,11 @@ include:
99
# If these break a user can manually go back and update the ref tag to a previous commit
1010
ref: master
1111
file: '/npm/0.1-template.yml'
12+
13+
variables:
14+
# Configure the rabbitmq server
15+
RABBITMQ_DEFAULT_USER: "guest"
16+
RABBITMQ_DEFAULT_PASS: "guest"
17+
18+
services:
19+
- rabbitmq:3.8-management-alpine

README.md

+119-8
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ Taube is a drop in replacement for cote. Without configuration it functions as a
1212
1. [Quick start guide](##Quick-start-guide)
1313
2. [Environment variables](#Environment-variables)
1414
3. [Migrate from cote](#Migrate-from-cote)
15-
4. [Readiness](#Readiness)
15+
4. [Monitoring and Signal Handling](#Monitoring-and-Signal-Handling)
1616
5. [Sockend](#Sockend)
17-
6. [Writing unit tests](#Writing-unit-tests)
17+
6. [Publisher/Subscriber](#Publisher/Subscriber)
18+
7. [Writing unit tests](#Writing-unit-tests)
1819

1920
## Quick start guide
2021

@@ -82,32 +83,59 @@ The `url` option needs to include `http` or `https` without a `/` at the end.
8283
| ------------------ |:----------------:| ---
8384
| TAUBE_HTTP_ENABLED | undefined / true | If set Taube will use HTTP instead of cote (axion). Set to true inside stack services.
8485
| TAUBE_HTTP_PORT | 4321 | Port of http server
85-
| TAUBE_HTTP_DEBUG | undefined | Adds debugging information to Taube (e.g. Boolean usedHttp to requesters send() responses)
86+
| ~~TAUBE_HTTP_DEBUG~~ | ~~undefined~~ | deprecated - ~~Adds debugging information to Taube (e.g. Boolean usedHttp to requesters send() responses)~~
87+
| TAUBE_DEBUG | undefined | Adds debugging information to Taube responses. See tests for usage. This does change responses and is only designed for development.
8688
| TAUBE_UNIT_TESTS | undefined | If set all requesters default their uri to <http://localhost>
8789
| TAUBE_RETRIES | 3 | Number of retries any Requester does before giving up. 3 is maximum value as retry duration would be over timeout.
8890
| TAUBE_COTE_DISABLED | undefined | If set, taube will not create cote components for responders and requesters
91+
| TAUBE_AMQP_ENABLED | undefined | If set Taube will use AMQP instead of cote (axion). Does not disable cote publishers sending data
92+
| TAUBE_AMQP_URI | undefined | AMQP uri (e.g. 'amqp://guest:guest@localhost')
93+
| TAUBE_AMQP_COTE_DISABLED | undefined |  If set, taube will not create cote components for Publishers and Subscribers
8994

9095

9196
## Migrate from cote
9297

98+
There is 3 modes you can run taube in while migrating from cote to taube.
99+
100+
1. Mode 1: Still use cote. Requesters/Responders and Publisher/Subscribers still use cote for communication
101+
2. Mode 2: Use taube, but still provide cote. Requesters will use HTTP and Subscribers AMQP. But Responders will still provide cote and Publishers will still publish using cote.
102+
3. Mode 3: Disable cote. cote components will no longer be created.
103+
104+
These settings can be tuned per component type:
105+
106+
| Type | Mode 1 | Mode 2 | Mode 3
107+
|:---:|:---:|:---:|:---:|
108+
| Requesters/Responders | by default | TAUBE_HTTP_ENABLED | TAUBE_COTE_DISABLED + TAUBE_HTTP_ENABLED
109+
| Publisher/Subscriber | by default | TAUBE_AMQP_ENABLED | TAUBE_AMQP_COTE_DISABLED + TAUBE_AMQP_ENABLED
110+
93111
The following is a proposed migration path:
94112

95113
1. Replace all `require('cote')` with `require('@cloud/taube')`
96114
2. Make sure your tests pass
97115
3. Pick a service
98116
4. Make sure it has a resolvable dns (e.g. add a Kubernetes service to it)
99-
5. Add the environment variable TAUBE_HTTP_ENABLED=true to the service
117+
5. Enable the taube services you want to use selectively. It is prefferable to activate one of the two options per iteration.
118+
1. For Requester/Responder HTTP: Add the environment variable TAUBE_HTTP_ENABLED=true to the service
119+
2. For AMQP Publisher/Subscribers: Initialize amqp pub/sub using the method described in [Publisher/Subscriber](#Publisher/Subscriber)
100120
6. Make sure your tests pass
101121
7. Go to 3 until no more services
102122

103-
## Readiness
123+
## Monitoring and Signal Handling
104124

105-
@infrastructure/observability can be used to get readiness checks for the taube http server.
125+
@infrastructure/observability can be used to get readiness/liveness checks and signal handling for the taube http server.
106126

107127
```javascript
108128
const observability = require('@infrastructure/observability')
109129

110-
observability.monitoring.addReadinessCheck(taube.monitoring.readinessCheck)
130+
observability.monitoring.observeServer(taube.http.server, taube.http.app)
131+
```
132+
133+
In order to gracefully handle Signal Handling and add liveness/readyness checks for AMQP, the following code can be used
134+
135+
```javascript
136+
const observability = require('@infrastructure/observability')
137+
138+
observability.monitoring.addOnSignalHook(taube.shutdown)
111139
```
112140

113141
## Sockend
@@ -229,8 +257,91 @@ socketNamespace.on('connection', function(socket) {
229257

230258
The Sockend component will not process any events of the 'data' event type and leave the process up to the custom handler.
231259

232-
## Writing unit tests
260+
## Publisher/Subscriber
261+
262+
The Publisher/Subscriber components can be used to connect to a AMQP enabled message broker. They provide the Publisher/Subscriber pattern to taube users.
263+
264+
To use these features you need to explicitly activate it using and TAUBE_AMQP_ENABLED and connect taube to a AMQP enabled message broker (e.g. RabbitMQ). `taube.init()` can be called multiple times. It only has an affect once.
265+
266+
```
267+
// Set TAUBE_AMQP_URI environment variable through your orchestration
268+
taube.init()
269+
// or pass directly
270+
taube.init({ amqp: { uri: process.env.TAUBE_AMQP_URI }})
271+
```
272+
273+
A subscriber can be setup to listen to all events of a topic type:
274+
275+
```
276+
const userSubscriber = new taube.Subscriber({ key: 'users' })
277+
278+
userSubscriber.on('users updated', async(data) => {
279+
...
280+
})
281+
```
282+
283+
A Publisher is used to publish the corresponding events:
284+
285+
```
286+
const publisher = new taube.Publisher({ key: 'users' })
287+
288+
publisher.publish(`users updated`, { data: {} })
289+
```
290+
291+
Every Publisher/Subscriber creates a Channel to RabbitMQ. There is a maximum number of channels per connection which is defined by your RabbitMQ [configuration](https://www.rabbitmq.com/configure.html). The default is 2047 per connection.
292+
293+
## Technical implementation details
294+
295+
Overview of the process between Publisher and Subscriber including the RabbitMQ concepts
296+
297+
```
298+
+-----------------------+-------------------------------------------+--------------------------+
299+
| Taube | RabbitMQ | Taube |
300+
| | | |
301+
| | | |
302+
| | +---------------+ | +------------------+ |
303+
| | topic a |temporary queue| channel | taube Subscriber | |
304+
| | +---------->+ -key +----+---->+ -key | |
305+
| | | | -topic a | | +------------------+ |
306+
| +---------------+ channel +---+----+ +---------------+ | |
307+
| |taube Publisher+-----+----->+exchange| | |
308+
| +---------------+ | | -key | | |
309+
| | +---+----+ | |
310+
| | | | |
311+
| | | +---------------+ channel +------------------+ |
312+
| | +---------->+temporary queue+----+---->+ taube Subscriber | |
313+
| | topic b | -key | | | -key | |
314+
| | topic a | -topic b | | +------------------+ |
315+
| | | -topic a | | |
316+
| | +---------------+ | |
317+
| | | |
318+
+-----------------------+-------------------------------------------+--------------------------+
319+
```
320+
321+
Concepts:
322+
323+
- exchange: An exchange is the place where the Publishers send their messages. Queues can "listen" on exchanges
324+
- queue: A queue of messages that listens on an exchange. In Pub/Sub we use non persistant, non worker queues, which function as Pub/Sub does in cote
325+
- channels: Multiple lightweight connections that share a single TCP connection between a process and RabbitMQ
326+
327+
Process:
328+
329+
After both the Publisher and Subscriber have registered their components a publish works like this:
330+
331+
1. Publisher sends message to exchange
332+
2. Exchange sends message too all queues that listen to that key and topic (key and route called in RabbitMQ)
333+
3. All queues trigger their consumers (listeners), which in this case is the taube Subscriber
334+
335+
## Writing unit tests for projects using taube
336+
337+
Currently does not support Publisher/Subscriber unit testing. Those unit tests will require a usable AMQP message broker connection initialized using `taube.init()`. You will need to call `taube.shutdown()` in
233338

234339
taube auto detects running in `NODE_ENV=test` and overwrites all requesters with `uri` = `http://localhost`. This means all Responders can easily be mocked. See `test/unit-test.test.js` for an example. It also uses a random port then which ensures that all Requesters and Responders in a process can only contact each other.
235340

236341
You can also force this by setting `TAUBE_UNIT_TESTS`
342+
343+
## Contributing to the taube project
344+
345+
In order to run the unit tests, you need to run `docker-compose up` inside `.test/`. Then run `npm run test-verbose` to run the unit tests.
346+
347+
This project has a unit test line coverage of 100% and everything below that fails the ci jobs.
+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
process.env.TAUBE_HTTP_ENABLED = true
2+
process.env.TAUBE_AMQP_URI = 'amqp://guest:guest@localhost'
3+
const taube = require('../../lib')
4+
5+
taube.init()
6+
7+
const publisher = new taube.Publisher({ key: 'users' })
8+
9+
const userSubscriber = new taube.Subscriber({ key: 'users' })
10+
11+
async function main() {
12+
await userSubscriber.on('users updated', (req) => {
13+
const { data } = req
14+
console.log(data)
15+
process.exit(0) // Only to make this example stop
16+
})
17+
18+
await publisher.publish(`users updated`, { data: {} })
19+
}
20+
21+
main().catch(err => {
22+
console.error(err)
23+
process.exit(1)
24+
})

examples/Setup/index.js

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
const observability = require('@infrastructure/observability')
2+
const taube = require('@cloud/taube')
3+
4+
// Observe the http server for healthchecks
5+
observability.monitoring.observeServer(taube.http.server, taube.http.app)
6+
// Shutdown all taube connections when shutting down service
7+
observability.monitoring.addOnSignalHook(async() => {
8+
await taube.shutdown()
9+
})

lib/amqp.js

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
const config = require('./config')
2+
var amqp = require('amqplib')
3+
4+
let conn
5+
let channels = []
6+
let errorHandler = defaultErrorHandler
7+
8+
async function init(options = {}) {
9+
if (!config.amqp.enabled) return Promise.resolve()
10+
if (conn) return Promise.resolve(conn)
11+
const uri = options.uri || config.amqp.uri
12+
if (!uri) {
13+
// eslint-disable-next-line max-len
14+
throw new Error('AMQP host URI needs to be defined either using init(uri) or TAUBE_AMQP_URI')
15+
}
16+
const promise = amqp.connect(uri, options)
17+
// Set to promise, so all other functions can await
18+
conn = promise
19+
try {
20+
conn = await promise
21+
} catch (error) {
22+
conn = null
23+
throw error
24+
}
25+
26+
// Passing an errorHandler is currently a private API
27+
errorHandler = options.errorHandler || defaultErrorHandler
28+
conn.on('error', errorHandler)
29+
conn.on('close', errorHandler)
30+
31+
return conn
32+
}
33+
34+
// Internal API for tests
35+
async function shutdownChannel(channel) {
36+
channel.removeAllListeners('error')
37+
channel.removeAllListeners('close')
38+
// Can fail in rare cases, where this function was called by tests before
39+
// as .close cannot be called multiple times
40+
await channel.close().catch((e) => {
41+
console.error(e)
42+
})
43+
}
44+
45+
async function shutdown() {
46+
await Promise.all(channels.map(channel => shutdownChannel(channel)))
47+
if (conn) {
48+
conn.removeAllListeners('error')
49+
conn.removeAllListeners('close')
50+
}
51+
conn = null
52+
channels = []
53+
}
54+
55+
// This function either returns a promise that can be awaited
56+
// which will now or in the future contain an active connection
57+
// or it fails if amqp has not been intialized
58+
function connection() {
59+
if (conn) return Promise.resolve(conn)
60+
throw new Error('AMQP needs to be initialized before usage. See taube README.md')
61+
}
62+
63+
// Get a channel to communicate to rabbitmq
64+
//
65+
// Also creates am exchange that is going to be used to
66+
// publish the messages to all listening queues
67+
//
68+
// Exchange: https://www.rabbitmq.com/tutorials/amqp-concepts.html#amqp-model
69+
// Channel: "lightweight connections that share a single TCP connection"
70+
// For more information: https://www.rabbitmq.com/channels.html
71+
async function channel() {
72+
const client = await connection()
73+
const channel = await client.createChannel()
74+
channel.on('error', errorHandler)
75+
channel.on('close', errorHandler)
76+
channels.push(channel)
77+
return channel
78+
}
79+
80+
// Private api only used to test channels
81+
function getChannels() {
82+
return channels
83+
}
84+
85+
// Private api
86+
function getErrorHandler() {
87+
return errorHandler
88+
}
89+
90+
91+
function defaultErrorHandler(error) {
92+
if (error) throw error
93+
throw new Error('amqp issue: connection issue')
94+
}
95+
96+
module.exports = {
97+
amqp,
98+
connection,
99+
shutdown,
100+
shutdownChannel,
101+
channel,
102+
getChannels,
103+
getErrorHandler,
104+
init
105+
}

0 commit comments

Comments
 (0)