Skip to content

Commit ac7fdeb

Browse files
committed
Fix getConsumerKeyHashRanges
1 parent 0055a24 commit ac7fdeb

File tree

2 files changed

+48
-23
lines changed

2 files changed

+48
-23
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java

+38-16
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
6060
* It is not changed unless a consumer is removed or a colliding consumer with higher priority is added.
6161
*/
6262
private static class HashRingEntry {
63-
private final List<Consumer> consumers;
63+
// This class is used to store the consumer and the order in which it was added to the hash ring
64+
// sorting will be by priority, consumer name and the order in which it was added
65+
record ConsumerEntry(Consumer consumer, int addOrder) {
66+
}
67+
68+
private final List<ConsumerEntry> consumers;
6469
private final int hash;
6570
Consumer selectedConsumer;
6671

@@ -70,22 +75,24 @@ public HashRingEntry(int hash) {
7075
}
7176

7277
public void addConsumer(Consumer consumer) {
73-
consumers.add(consumer);
74-
selectConsumer();
78+
consumers.add(new ConsumerEntry(consumer, consumers.size()));
79+
selectConsumer(true);
7580
}
7681

7782
public void removeConsumer(Consumer consumer) {
78-
consumers.remove(consumer);
79-
selectConsumer();
83+
consumers.removeIf(consumerEntry -> consumerEntry.consumer().equals(consumer));
84+
selectConsumer(false);
8085
}
8186

8287
public Consumer getSelectedConsumer() {
8388
return selectedConsumer;
8489
}
8590

86-
private void selectConsumer() {
91+
private void selectConsumer(boolean performSorting) {
8792
if (consumers.size() > 1) {
88-
sortConsumersByPriorityLevelAndName(consumers);
93+
if (performSorting) {
94+
sortConsumersByPriorityLevelAndName(consumers);
95+
}
8996
List<Consumer> priorityConsumers = pickPriorityConsumers(consumers);
9097
if (priorityConsumers.size() > 1) {
9198
// use the hash to select a consumer from the priority consumers
@@ -104,24 +111,27 @@ private void selectConsumer() {
104111
selectedConsumer = priorityConsumers.get(0);
105112
}
106113
} else if (consumers.size() == 1) {
107-
selectedConsumer = consumers.get(0);
114+
selectedConsumer = consumers.get(0).consumer();
108115
} else {
109116
selectedConsumer = null;
110117
}
111118
}
112119

113-
private static List<Consumer> pickPriorityConsumers(List<Consumer> consumers) {
114-
Consumer firstConsumer = consumers.get(0);
120+
private static List<Consumer> pickPriorityConsumers(List<ConsumerEntry> consumers) {
121+
Consumer firstConsumer = consumers.get(0).consumer();
115122
List<Consumer> priorityConsumers = consumers.stream()
116-
.takeWhile(c -> c.getPriorityLevel() == firstConsumer.getPriorityLevel()
117-
&& c.consumerName().equals(firstConsumer.consumerName()))
123+
.takeWhile(c -> c.consumer().getPriorityLevel() == firstConsumer.getPriorityLevel()
124+
&& c.consumer().consumerName().equals(firstConsumer.consumerName()))
125+
.map(ConsumerEntry::consumer)
118126
.toList();
119127
return priorityConsumers;
120128
}
121129

122-
private static void sortConsumersByPriorityLevelAndName(List<Consumer> consumers) {
123-
consumers.sort(Comparator.comparing(Consumer::getPriorityLevel).reversed()
124-
.thenComparing(Consumer::consumerName));
130+
private static void sortConsumersByPriorityLevelAndName(List<ConsumerEntry> consumers) {
131+
consumers.sort(Comparator.<ConsumerEntry, Integer>
132+
comparing(entry -> entry.consumer().getPriorityLevel()).reversed()
133+
.thenComparing(entry -> entry.consumer().consumerName())
134+
.thenComparing(ConsumerEntry::addOrder));
125135
}
126136
}
127137

@@ -191,12 +201,24 @@ public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
191201
Map<Consumer, List<Range>> result = new LinkedHashMap<>();
192202
rwLock.readLock().lock();
193203
try {
204+
if (hashRing.isEmpty()) {
205+
return result;
206+
}
194207
int start = 0;
208+
int lastKey = 0;
195209
for (Map.Entry<Integer, HashRingEntry> entry: hashRing.entrySet()) {
196210
Consumer consumer = entry.getValue().getSelectedConsumer();
197211
result.computeIfAbsent(consumer, key -> new ArrayList<>())
198212
.add(Range.of(start, entry.getKey()));
199-
start = entry.getKey() + 1;
213+
lastKey = entry.getKey();
214+
start = lastKey + 1;
215+
}
216+
// Handle wrap-around
217+
HashRingEntry firstHashRingEntry = hashRing.firstEntry().getValue();
218+
Consumer firstSelectedConsumer = firstHashRingEntry.getSelectedConsumer();
219+
List<Range> ranges = result.get(firstSelectedConsumer);
220+
if (lastKey != Integer.MAX_VALUE - 1) {
221+
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
200222
}
201223
} finally {
202224
rwLock.readLock().unlock();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.assertj.core.api.Assertions.assertThat;
2122
import static org.mockito.Mockito.mock;
2223
import static org.mockito.Mockito.when;
2324
import java.util.ArrayList;
@@ -148,9 +149,15 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
148149
for (String s : consumerName) {
149150
Consumer consumer = mock(Consumer.class);
150151
when(consumer.consumerName()).thenReturn(s);
152+
when(consumer.getPriorityLevel()).thenReturn(0);
153+
when(consumer.toString()).thenReturn(s);
151154
selector.addConsumer(consumer);
152155
consumers.add(consumer);
153156
}
157+
158+
assertThat(selector.getConsumerKeyHashRanges())
159+
.containsExactlyEntriesOf(selector.getConsumerKeyHashRanges());
160+
154161
Map<Consumer, List<Range>> expectedResult = new HashMap<>();
155162
expectedResult.put(consumers.get(0), Arrays.asList(
156163
Range.of(119056335, 242013991),
@@ -159,17 +166,13 @@ public void testGetConsumerKeyHashRanges() throws BrokerServiceException.Consume
159166
expectedResult.put(consumers.get(1), Arrays.asList(
160167
Range.of(0, 90164503),
161168
Range.of(90164504, 119056334),
162-
Range.of(382436668, 722195656)));
169+
Range.of(382436668, 722195656),
170+
Range.of(1914695767, 2147483646)));
163171
expectedResult.put(consumers.get(2), Arrays.asList(
164172
Range.of(242013992, 242377547),
165173
Range.of(242377548, 382436667),
166174
Range.of(1656011843, 1707482097)));
167-
for (Map.Entry<Consumer, List<Range>> entry : selector.getConsumerKeyHashRanges().entrySet()) {
168-
System.out.println(entry.getValue());
169-
Assert.assertEquals(entry.getValue(), expectedResult.get(entry.getKey()));
170-
expectedResult.remove(entry.getKey());
171-
}
172-
Assert.assertEquals(expectedResult.size(), 0);
175+
assertThat(selector.getConsumerKeyHashRanges()).containsExactlyInAnyOrderEntriesOf(expectedResult);
173176
}
174177

175178
// reproduces https://github.com/apache/pulsar/issues/22050

0 commit comments

Comments
 (0)