Skip to content

Commit

Permalink
[GEARPUMP-197] fix busy loop in FetchThread
Browse files Browse the repository at this point in the history
Author: manuzhang <[email protected]>

Closes apache#77 from manuzhang/fix_fetch_thread.
  • Loading branch information
manuzhang committed Aug 30, 2016
1 parent 529799c commit 3c0ebb1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
CONSUMER_START_OFFSET_DOC)
.define(ENABLE_AUTO_COMMIT_CONFIG,
ConfigDef.Type.BOOLEAN,
true,
false,
ConfigDef.Importance.MEDIUM,
ENABLE_AUTO_COMMIT_DOC)
.define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG,
Expand Down Expand Up @@ -209,7 +209,6 @@ public ConsumerConfig getConsumerConfig() {
if (!props.containsKey(GROUP_ID_CONFIG)) {
props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG));
}
props.put(ENABLE_AUTO_COMMIT_CONFIG, "false");

return new ConsumerConfig(props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ private[kafka] class FetchThread(
resetConsumers(nextOffsets)
reset = false
}
val hasMoreMessages = fetchMessage
val fetchMore: Boolean = fetchMessage
sleeper.reset()
if (!hasMoreMessages) {
if (!fetchMore) {
// sleep for given duration
sleeper.sleep(fetchSleepMS)
}
Expand All @@ -133,19 +133,21 @@ private[kafka] class FetchThread(

/**
* fetch message from each TopicAndPartition in a round-robin way
*
* @return whether to fetch more messages
*/
private def fetchMessage: Boolean = {
consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
val (_, consumer) = tpAndConsumer
if (incomingQueue.size < fetchThreshold) {
if (incomingQueue.size >= fetchThreshold) {
false
} else {
consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
val (_, consumer) = tpAndConsumer
if (consumer.hasNext) {
incomingQueue.put(consumer.next())
true
} else {
hasNext
}
} else {
true
}
}
}
Expand Down

0 comments on commit 3c0ebb1

Please sign in to comment.