Messaging npm module for FFC services
npm install --save ffc-messaging
name
- name of connection, if not supplied the address name will be used. This value is also used in App Insights tracing
host
- Azure Service Bus namespace, for example, myservicebus.servicebus.windows.net
useCredentialChain
- Boolean value for whether to authenticate connection with using Azure's credential chain. For example, set this to true if you wish to use AAD Pod Identity. If false
, then username
and password
or connectionString
are required.
connectionString
- Azure Service Bus connection string. If provided, username
and password
are ignored.
username
- Azure Service Bus Shared Access Key name for authentication. Not required if useCredentialChain
is true
or connectionString
is provided.
password
- Azure Service Bus Shared Access Key value for authentication. Not required if useCredentialChain
is true
or connectionString
is provided.
type
- Azure Service Bus entity to connect to, allows queue
, sessionQueue
, topic
or subscription
.
address
- Name of the Azure Service Bus queue, topic or subscription to connect to.
topic
- Required for subscription connections only. The name of the topic the subscription belongs to.
appInsights
- Application Insights module if logging is required
retries
- How many times should a sender try to send a message, defaulting to 5
if not supplied. With Pod Identity and Azure Identity there is a scenario that the identity will not be allocated in time for it's usage which causes failure sending messages. 5
is usually sufficient but can be increased if necessary.
retryWaitInMs
- How long should a sender wait in milliseconds before trying to resend, defaulting to 500
if not supplied.
exponentialRetry
- Whether to exponentially retry, ie doubling the retryWaitInMs
on every retry. Defaulted to false
.
autoCompleteMessages
- (Subscriptions only - see below) Whether to auto complete messages once the action method has ran. Defaults to false
.
maxConcurrentCalls
- (Subscriptions only - see below) Maximum number of messages received from message handler without being settled. Defaults to 1
.
const config = {
host: 'myservicebus.servicebus.windows.net',
useCredentialChain: false,
username: 'mySharedAccessKeyName',
password: 'mySharedAccessKey,
address: 'mySubscription,
type: 'subscription',
topic: 'myTopic',
appInsights: require('applicationinsights'),
retries: 5
}
Message objects must follow the below structure.
body
- The body of the message.
type
- Type of message using reverse DNS notation. For example, uk.gov.demo.claim.validated
.
source
- Name of the service sending the message. For example, ffc-demo-claim-service
.
metadata
- Optional object containing metadata to be added to message.
In addition, any property as described in the Microsoft documentation.
const message = {
body: { claimId: 1 },
type: 'uk.gov.demo.claim.validated',
subject: 'New Claim',
source: 'ffc-demo-claim-service',
metadata: {
myCustomProperty: 'my-value'
}
}
const sender = new MessageSender(config)
await sender.sendMessage(message)
// shutdown when needed
await sender.closeConnection()
The sendMessage
function can also receive all options applicable to Azure Service Bus sendMessages
as a parameter, see Azure documentation.
await sender.sendMessage(message, options)
const messages = [{
body: { claimId: 1 },
type: 'uk.gov.demo.claim.validated',
subject: 'New Claim 1',
source: 'ffc-demo-claim-service'
},
{
body: { claimId: 2 },
type: 'uk.gov.demo.claim.validated',
subject: 'New Claim 2',
source: 'ffc-demo-claim-service'
}]
const sender = new MessageBatchSender(config)
await sender.sendBatchMessages(messages)
// shutdown when needed
await sender.closeConnection()
The sendBatchMessages
function can also receive all options applicable to Azure Service Bus sendMessages
as a parameter, see Azure documentation.
await sender.sendBatchMessages(message, options)
There are multiple options for receiving a message.
Permanently subscribe to all messages. Automatically will handle any intermittent disconnects.
const action = function (message) {
console.log(message.body)
}
const receiver = new MessageReceiver(config, action)
await receiver.subscribe()
// shutdown when needed
await receiver.closeConnection()
Connect to a specified session.
const receiver = new MessageReceiver(config)
await receiver.acceptSession(sessionId)
messages = await receiver.receiveMessages(1)
// shutdown when needed
await receiver.closeConnection()
Connect to next session.
const receiver = new MessageReceiver(config)
await receiver.acceptNextSession()
messages = await receiver.receiveMessages(1)
// shutdown when needed
await receiver.closeConnection()
Single call to receive current messages messages.
const receiver = new MessageReceiver(config, action)
// receive a maximum of 10 messages
messages = await receiver.receiveMessages(10)
// shutdown when needed
await receiver.closeConnection()
The receiveMessages
function can also receive all options applicable to Azure Service Bus receiveMessages
as a parameter, see Azure documentation.
await receiver.receiveMessages(10, options)
It is often beneficial when using this to specify the maximum wait time for both the first message and the last message to improve performance of the application. For example:
// This will wait a maximum of one second for the first message, if no message exists then the response will return.
// If a message is received within one second it will wait a further five seconds or until it receives 10 messages to return
messages = await receiver.receiveMessages(batchSize, { maxWaitTimeInMs: 1000, maxTimeAfterFirstMessageInMs: 5000 })
Same as receiveMessages
but does not mark the message as complete so it can still be received by other services once the peek lock expires.
const receiver = new MessageReceiver(config, action)
// receive a maximum of 10 messages
messages = await receiver.peekMessages(10)
// shutdown when needed
await receiver.closeConnection()
Once a message is received through a peek lock, a response must be sent to Azure Service Bus before the lock expires otherwise Service Bus will resend the message.
If this is not the intended behaviour there are several responses that can be sent.
Message is complete and no further processing needed.
await receiver.completeMessage(message)
Message cannot be processed by any client so should be added to dead letter queue.
await receiver.deadLetterMessage(message)
Abandon processing of current message so it can be redelivered, potentially to another client.
await receiver.abandonMessage(message)
Defer message back to queue for later processing. It will not be redelivered to any client unless the receiver command supplies the sequence number as an option.
Further reading on deferring messages
// Defer
await receiver.deferMessage(message)
THIS INFORMATION IS LICENSED UNDER THE CONDITIONS OF THE OPEN GOVERNMENT LICENCE found at:
http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3
The following attribution statement MUST be cited in your products and applications when using this information.
Contains public sector information licensed under the Open Government license v3
The Open Government Licence (OGL) was developed by the Controller of Her Majesty's Stationery Office (HMSO) to enable information providers in the public sector to license the use and re-use of their information under a common open licence.
It is designed to encourage use and re-use of information freely and flexibly, with only a few conditions.