From 0b1968b0fff03c439cf5edb6174a8fc466dfe6f1 Mon Sep 17 00:00:00 2001
From: Miguel Prieto <miguel.prieto@orkes.io>
Date: Sat, 4 Jan 2025 08:39:46 -0300
Subject: [PATCH] Allow a waitTime of zero. The wait is buggy: it will sleep in
 vain unless another thread called __peekedMessages at the exact time it was
 sleeping.

---
 .../io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java     | 6 +-----
 .../main/java/io/orkes/conductor/mq/redis/QueueMonitor.java | 4 ++--
 2 files changed, 3 insertions(+), 7 deletions(-)

diff --git a/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java b/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java
index b0dc844..a8f2aab 100644
--- a/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java
+++ b/orkes-conductor-queues/src/main/java/io/orkes/conductor/queue/dao/BaseRedisQueueDAO.java
@@ -101,12 +101,8 @@ public final boolean pushIfNotExists(
 
     @Override
     public final List<String> pop(String queueName, int count, int timeout) {
-        // Keep the timeout to a minimum of 100ms
-        if (timeout < 100) {
-            timeout = 100;
-        }
         List<QueueMessage> messages = get(queueName).pop(count, timeout, TimeUnit.MILLISECONDS);
-        return messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
+        return messages.stream().map(QueueMessage::getId).collect(Collectors.toList());
     }
 
     @Override
diff --git a/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java b/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java
index 78e00f1..c6f7c1c 100644
--- a/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java
+++ b/orkes-redis-queues/src/main/java/io/orkes/conductor/mq/redis/QueueMonitor.java
@@ -56,7 +56,7 @@ public QueueMonitor(String queueName) {
 
     public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
         if (count <= 0) {
-            log.warn("Negative poll count {}");
+            log.warn("Negative poll count {}", count);
             // Negative number shouldn't happen, but it can be zero and in that case we don't do
             // anything!
             return new ArrayList<>();
@@ -76,7 +76,7 @@ public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
                 // The sleep method below, just does Thread.wait should be more CPU friendly
                 QueueMessage message = peekedMessages.poll();
                 if (message == null) {
-                    if (!waited) {
+                    if (!waited && waitTime > 0) {
                         Uninterruptibles.sleepUninterruptibly(waitTime, timeUnit);
                         waited = true;
                         continue;