From 0bdb8f1d3413f37ff63c2e289fa473be2be68a5f Mon Sep 17 00:00:00 2001 From: Louis Lambeau Date: Fri, 10 May 2024 15:01:38 +0200 Subject: [PATCH] Pass x- headers to queue driver on message publish. --- src/config/schema.fio.ts | 3 ++- src/jobs/dispatcher.ts | 8 +++++--- src/queue/drivers/amqp.ts | 6 ++++-- src/queue/index.ts | 1 + src/server/rest/index.ts | 14 +++++++++++++- 5 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/config/schema.fio.ts b/src/config/schema.fio.ts index 7ba73c0..8409789 100644 --- a/src/config/schema.fio.ts +++ b/src/config/schema.fio.ts @@ -5,7 +5,7 @@ ID = String(s | /[a-z]+[a-z_-]+/.test(s)) #### QUEUE -AMQP.Exchange.Type = String :: { "topic", "direct", "fanout" } +AMQP.Exchange.Type = String :: { "topic", "direct", "fanout", "x-delayed-message" } AMQP.Exchange = { name : String @@ -13,6 +13,7 @@ AMQP.Exchange = { default :? Boolean options :? { durable: Boolean + ...: . } } diff --git a/src/jobs/dispatcher.ts b/src/jobs/dispatcher.ts index 3c46939..0fa03a4 100644 --- a/src/jobs/dispatcher.ts +++ b/src/jobs/dispatcher.ts @@ -96,7 +96,9 @@ export default class JobDispatcher { dispatched: new Date(), }))); - const promises = jobs.map(job => Arnavon.queue.push(jobName, job)); + delete options['strict']; + + const promises = jobs.map(job => Arnavon.queue.push(jobName, job, options)); return Promise.all(promises) .then(() => jobs); @@ -109,7 +111,7 @@ export default class JobDispatcher { return this.jobs[metadata.jobName as string].validator; } - dispatch(jobName: string, data: unknown, meta = {}) { + dispatch(jobName: string, data: unknown, meta = {}, extraOptions = {}) { const jobConfig = this.jobs[jobName]; if (!jobConfig) { this.#counters.unknown.inc({ jobName }); @@ -133,7 +135,7 @@ export default class JobDispatcher { dispatched: new Date(), })); - return Arnavon.queue.push(jobName, job) + return Arnavon.queue.push(jobName, job, extraOptions) .then(() => job); } diff --git a/src/queue/drivers/amqp.ts b/src/queue/drivers/amqp.ts index 54f38f9..e60289d 100644 --- a/src/queue/drivers/amqp.ts +++ b/src/queue/drivers/amqp.ts @@ -43,6 +43,7 @@ export type AMQPQueueConfig = { export type AQMPPushOptions = { exchange: string + headers?: Record } class AmqpQueue extends Queue { @@ -182,12 +183,13 @@ class AmqpQueue extends Queue { return this; } - _push(key: string, data: unknown, { exchange }: AQMPPushOptions) { + _push(key: string, data: unknown, { exchange, headers }: AQMPPushOptions) { if (!this.#channel) { throw new Error('Cannot push, no channel found'); } const payload = Buffer.from(JSON.stringify(data)); - const options = { persistent: true }; + const options = { persistent: true, headers }; + return new Promise((resolve, reject) => { return (this.#channel as amqplib.ConfirmChannel) .publish(exchange || this.#exchange, key, payload, options, (err) => { diff --git a/src/queue/index.ts b/src/queue/index.ts index 9022c7f..65f370d 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -51,6 +51,7 @@ class Queue extends EventEmitter { // subclasses should implement _push(key, data) push(key: string, data: unknown, opts = {}): Promise { logger.info(`${this.constructor.name} - Pushing to queue`, key, data); + return this._push(key, data, opts).then((job) => { logger.info(`${this.constructor.name} - Pushed`); return job; diff --git a/src/server/rest/index.ts b/src/server/rest/index.ts index 973f59a..9de9654 100644 --- a/src/server/rest/index.ts +++ b/src/server/rest/index.ts @@ -44,9 +44,21 @@ export default (dispatcher: JobDispatcher) => { }); } + // Pass all x-* headers (except x-arnavon-*) to the queue + // This allows using special rabbitmq things like delayed messages, ttl, etc + const headers = Object.keys(req.headers) + .filter(h => h.toLowerCase().startsWith('x-') && !h.toLowerCase().startsWith('x-arnavon')) + .reduce((headers, k) => { + headers[k] = req.headers[k]; + return headers; + }, {}); + // Decide on the dispatch mode const dispatchFn = pushMode === 'SINGLE' ? dispatcher.dispatch : dispatcher.dispatchBatch; - dispatchFn.bind(dispatcher)(req.params.id, req.body, { id: req.id }, { strict: validationMode === 'ALL-OR-NOTHING' }) + dispatchFn.bind(dispatcher)(req.params.id, req.body, { id: req.id }, { + strict: validationMode === 'ALL-OR-NOTHING', + headers, + }) .then((job) => { return res.status(201).send(job); })