Skip to content

Commit

Permalink
Variable modified in read method
Browse files Browse the repository at this point in the history
  • Loading branch information
guan46 committed Jan 27, 2025
1 parent 2a9d4ac commit be39684
Showing 1 changed file with 11 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,12 +439,15 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
permitsForConsumer.computeIfAbsent(consumer,
k -> new MutableInt(getAvailablePermits(k)));
// a consumer was found for the sticky key hash and the entry can be dispatched
if (permits.intValue() > 0
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
if (permits.intValue() > 0) {
boolean canDispatchEntry = canDispatchEntry(consumer, entry, readType, stickyKeyHash);
if (!canDispatchEntry) {
blockedByHash.setTrue();
// decrement the permits for the consumer
permits.decrement();
// allow the entry to be dispatched
dispatchEntry = true;
}
}
}
}
Expand Down Expand Up @@ -507,27 +510,18 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {

// checks if the entry can be dispatched to the consumer
private boolean canDispatchEntry(Consumer consumer, Entry entry,
ReadType readType, int stickyKeyHash,
MutableBoolean blockedByHash) {
ReadType readType, int stickyKeyHash) {
// If redeliveryMessages contains messages that correspond to the same hash as the entry to be dispatched
// do not send those messages for order guarantee
if (readType == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}

if (drainingHashesRequired) {
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
if (blockedByHash != null) {
blockedByHash.setTrue();
}
return false;
}
}

return true;
}

Expand Down Expand Up @@ -748,3 +742,4 @@ public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {

private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
}

0 comments on commit be39684

Please sign in to comment.