Skip to content

Commit

Permalink
Allow a waitTime of zero. The wait is buggy: it will sleep in vain un…
Browse files Browse the repository at this point in the history
…less another thread called __peekedMessages at the exact time it was sleeping.
  • Loading branch information
jmigueprieto committed Jan 4, 2025
1 parent 406b298 commit 0b1968b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,8 @@ public final boolean pushIfNotExists(

@Override
public final List<String> pop(String queueName, int count, int timeout) {
// Keep the timeout to a minimum of 100ms
if (timeout < 100) {
timeout = 100;
}
List<QueueMessage> messages = get(queueName).pop(count, timeout, TimeUnit.MILLISECONDS);
return messages.stream().map(msg -> msg.getId()).collect(Collectors.toList());
return messages.stream().map(QueueMessage::getId).collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public QueueMonitor(String queueName) {

public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
if (count <= 0) {
log.warn("Negative poll count {}");
log.warn("Negative poll count {}", count);
// Negative number shouldn't happen, but it can be zero and in that case we don't do
// anything!
return new ArrayList<>();
Expand All @@ -76,7 +76,7 @@ public List<QueueMessage> pop(int count, int waitTime, TimeUnit timeUnit) {
// The sleep method below, just does Thread.wait should be more CPU friendly
QueueMessage message = peekedMessages.poll();
if (message == null) {
if (!waited) {
if (!waited && waitTime > 0) {
Uninterruptibles.sleepUninterruptibly(waitTime, timeUnit);
waited = true;
continue;
Expand Down

0 comments on commit 0b1968b

Please sign in to comment.