diff --git a/index.js b/index.js index acb2b3e..b53726f 100644 --- a/index.js +++ b/index.js @@ -10,6 +10,9 @@ module.exports = { createConsumer: consumerService.createConsumer, createMessage: messageService.createMessage, createTopic: topicService.createTopic, + getConsumersForTopic: topicService.getConsumersForTopic, + getTopics: topicService.getTopics, getStatus: queueService.getStatus, - getProcessedMessages: queueService.getProcessedMessages + getProcessedMessages: queueService.getProcessedMessages, + getQueueMessages: queueService.getQueueMessages }; \ No newline at end of file diff --git a/models/message.js b/models/message.js index ad8c239..aaf0d02 100644 --- a/models/message.js +++ b/models/message.js @@ -10,6 +10,7 @@ class Message { this._value = ''; this._processing_details = []; this._processed = false; + this._dropped = false; this._allowed_retries = 0; } @@ -49,6 +50,14 @@ class Message { this._processed = value; } + getDropped() { + return this._dropped; + } + + setDropped(value) { + this._dropped = value; + } + setAllowedRetries(value) { this._allowed_retries = value; } diff --git a/models/queue.js b/models/queue.js index 43dd478..e74a516 100644 --- a/models/queue.js +++ b/models/queue.js @@ -8,6 +8,7 @@ class Queue { this._retries = retries; this._queue = []; this._processed_messages = []; + this._messages = []; } getSize() { @@ -27,6 +28,7 @@ class Queue { enQueue(message) { this._queue.unshift(message); + this._messages.push(message); } numberOfMessagesInQueue() { @@ -57,6 +59,10 @@ class Queue { setProcessedMessages(processedMessages) { this._processed_messages = processedMessages; } + + getMessages() { + return this._messages; + } } module.exports = Queue; \ No newline at end of file diff --git a/services/message.handler.service.js b/services/message.handler.service.js index 757078d..1a228ee 100644 --- a/services/message.handler.service.js +++ b/services/message.handler.service.js @@ -45,6 +45,7 @@ const processMessage = function (message) { if (_.isEmpty(consumersForTopic)) { logger.info(`No consumers found for topic ${message.getTopic()}`); message.setProcessed(true); + message.setDropped(true); return Promise.resolve(message); } else { const consumersByPriority = _.groupBy(consumersForTopic, consumer => { diff --git a/services/queue.service.js b/services/queue.service.js index 096363d..be4e208 100644 --- a/services/queue.service.js +++ b/services/queue.service.js @@ -92,6 +92,7 @@ const getProcessedMessages = function () { } return Promise.resolve(QueueInstance.getProcessedMessages()); }; + const pushMessageToProcessedMessage = function (message) { if (!QueueInstance) { try { @@ -106,9 +107,21 @@ const pushMessageToProcessedMessage = function (message) { return Promise.resolve(); }; +const getQueueMessages = function() { + if (!QueueInstance) { + try { + initQueue(); + } catch (err) { + return Promise.reject(err.toString()); + } + } + return Promise.resolve(QueueInstance.getMessages()); +}; + module.exports = { pushMessageToQueue, getStatus, getProcessedMessages, - pushMessageToProcessedMessage + pushMessageToProcessedMessage, + getQueueMessages, }; diff --git a/services/topic.service.js b/services/topic.service.js index 274a7bd..2a51c94 100644 --- a/services/topic.service.js +++ b/services/topic.service.js @@ -37,10 +37,19 @@ const getConsumersForTopic = function (topic) { return Promise.resolve(topicVsConsumer[topic]); }; +const getTopics = function () { + const topics = Object.keys(topicVsConsumer); + if (topics.length === 0) { + return Promise.reject(`No topics created`); + } else { + return Promise.resolve(topics); + } +} module.exports = { isValidTopic, createTopic, registerConsumerForTopic, - getConsumersForTopic + getConsumersForTopic, + getTopics };