- minimal dependencies: only
amqplib
anduuid
are needed - promises-based: async/await support makes it easy to use
- small api: 4 methods only, there's not much to learn 😉
- instantiation-able: the library itself is a Class ; get as many independent instance as needed
- events logging: choose your own log transport
- disconnections-proof: auto reconnect/resend when errors happen
This wrapper was tailored made for our microservices architecture.
We use RabbitMQ as the bus in the publish-subscribe pattern. A message is never sent directly to a queue, it is always sent to a configured exchange.
Every message should be acknowledged by returning a value : ACK
, NACK
, REJECT
(message.ACK
, message.NACK
or message.REJECT
in the message object).
A non-acknowledged message will be redelivered one time to the exchange. A rejected message will be directly deleted or redirected to the dead letter exchange if configured.
Our queues and exchanges are defined by a external engine. rab-q
doesn't provide API to create, assert or delete these.
$ npm install rab-q
const RabQ = require('rab-q');
const rabQ = new RabQ({
exchange: 'your_topic_exchange',
queues: 'queue_bind_to_exchange'
});
rabQ.on('error', err => {
console.error(err);
});
rabQ.on('log', log => {
console[log.level](log.msg, log.err || '');
});
rabQ.subscribesTo(/.*/, message => {
// Do stuff!
// Then acknowledge message by returning a constant
return message.ACK;
});
rabQ.start()
.then(() => {
rabQ.publish('yourRoutingKeyHere', {your: 'message'});
});
Type: string
Default: guest
User to connect on RabbitMQ.
Type: string
Default: guest
Password associate to username.
Type: amqp|amqps
Default: amqp
Connection protocol.
Type: string
Default: localhost
Hostname to trying to get connection.
Type: number
Default: 5672
Port of connection.
Type: object
Default: null
socketOptions for amqplib
https://amqp-node.github.io/amqplib/channel_api.html#connect
Type: string
Default: /
Selected virtual host.
Required
Type: string
Exchange to publish messages.
Type: Array<string>
string
Consumes messages from these queues.
If this option is omitted or has a falsey value, server will create random names.
Type: number
Default: 10
Defines the number of messages prefetched by channel. Once the max number of messages is reached, RabbitMQ will wait for some messages to be acknowledged before proceeding with new messages.
Type: number
Default: 0
Defines a delay in milliseconds before a message is rejected with NACK.
Type: number
(milliseconds)
Default: 0
Time in milliseconds before trying to reconnect when connection is lost.
Type: boolean
Default: false
Enables auto acknowledgment.
Type: boolean
Default: true
Enables auto reconnection if an error happens while connecting to the server.
Type: Object of function
Type: boolean
Default: return true
Function run before each message treatment. If it return a false value, the message is reject.
Type: function
Default: () => {}
Parameters: message
(see below)
Function run before each message treatment, can modify message.
Type: function
Default: () => {}
Parameters: message
(see below), subscriberResult
(ACK/NACK/REJECT)
Function run after each message treatment.
Type: function
Default: null
Parameters: routingKey
, content
, properties
(see publish method)
Must return an object with {routingKey, content, properties}
Function run before each publish call
Starts a connection.
Returns a promise resolved with true
for a successful connection or false
if a connection already exists.
Stops and closes a connection.
Publishes a message on exchange rabQ.exchange
.
Required
Type: string
A regular expression to match the routing keys of consumed messages.
Required
Type: Object
A message object to send to the exchange.
Type: Object
Default: {}
Adds RabbitMQ properties to the message. See http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish (options)
If properties does not have a headers
key, properties
is considered to be headers
.
You can provide a property x-query-token
in headers
to trace the lifecyle of a request. If not provided a new UUID will be generated.
Adds a Subscriber on consumed messages filtered by the routing key
Required
Type: RegExp
A regular expression to match on the routing keys of consumed messages.
Required
Type: Function
A function should acknowledge (or not) messages by returning either a value or a resolved promise with value. ('ACK'
, 'NACK'
or 'REJECT'
)
message
is a object with following properties:
- ACK
string
: the constant returned for a positive acknowledgment - NACK
string
: the constant returned for a negative acknowledgment. NACK will re-queuing message one time. - REJECT
string
: the constant returned for a rejection without redelivery. - content
Object
: content of the received message - rk
string
: routing key of the message - queue
string
: queue name where the message is consumed - token
string
: a UUID to identify the message - consumeAt
number
: timestamp when the message is consumed
Emits a log object with the following properties:
- level
string
: can beinfo
,warn
,error
- token
string
: the UUID to identify the message - msg
string
: the message, - err
Object
: the original error if log level is error
Emits an Error if something goes wrong with the connection. If you don't catch this event the process will sto