Skip to content

Commit 1695f4e

Browse files
committed
KREST-182: (Correctly) fences access to internal consumer buffer.
Concurrent cosume requests on the same consumer could race in between calls to `KafkaConsumerState.hasNext()` and `KafkaConsumerState.next()`. I use double-checked locking in `KafkaConsumerReadTask` to allow multiple consume requests on the same consumer to make progress in parallel (instead of one request completing at a time, per consumer).
1 parent 140b00e commit 1695f4e

File tree

2 files changed

+156
-228
lines changed

2 files changed

+156
-228
lines changed

kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerReadTask.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class KafkaConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
3939

4040
private static final Logger log = LoggerFactory.getLogger(KafkaConsumerReadTask.class);
4141

42-
private KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
42+
private final KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> parent;
4343
private final long requestTimeoutMs;
4444
// the minimum bytes the task should accumulate
4545
// before returning a response (or hitting the timeout)
@@ -138,11 +138,18 @@ public boolean isDone() {
138138
*/
139139
private void addRecords() {
140140
while (!exceededMinResponseBytes && !exceededMaxResponseBytes && parent.hasNext()) {
141-
maybeAddRecord();
141+
synchronized (parent) {
142+
if (parent.hasNext()) {
143+
maybeAddRecord();
144+
}
145+
}
142146
}
143147
while (!exceededMaxResponseBytes && parent.hasNextCached()) {
144-
// will not call poll() anymore. Continue draining loaded records
145-
maybeAddRecord();
148+
synchronized (parent) {
149+
if (parent.hasNextCached()) {
150+
maybeAddRecord();
151+
}
152+
}
146153
}
147154
}
148155

0 commit comments

Comments
 (0)