diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java index 8c931cd37..451faec01 100644 --- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java +++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java @@ -155,7 +155,7 @@ public class KafkaConfig extends AbstractConfig implements Serializable { CONSUMER_START_OFFSET_DOC) .define(ENABLE_AUTO_COMMIT_CONFIG, ConfigDef.Type.BOOLEAN, - true, + false, ConfigDef.Importance.MEDIUM, ENABLE_AUTO_COMMIT_DOC) .define(CHECKPOINT_STORE_NAME_PREFIX_CONFIG, @@ -209,7 +209,6 @@ public ConsumerConfig getConsumerConfig() { if (!props.containsKey(GROUP_ID_CONFIG)) { props.put(GROUP_ID_CONFIG, getString(GROUP_ID_CONFIG)); } - props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); return new ConsumerConfig(props); } diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala index 3119f4025..49c116ca5 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala @@ -116,9 +116,9 @@ private[kafka] class FetchThread( resetConsumers(nextOffsets) reset = false } - val hasMoreMessages = fetchMessage + val fetchMore: Boolean = fetchMessage sleeper.reset() - if (!hasMoreMessages) { + if (!fetchMore) { // sleep for given duration sleeper.sleep(fetchSleepMS) } @@ -133,19 +133,21 @@ private[kafka] class FetchThread( /** * fetch message from each TopicAndPartition in a round-robin way + * + * @return whether to fetch more messages */ private def fetchMessage: Boolean = { - consumers.foldLeft(false) { (hasNext, tpAndConsumer) => - val (_, consumer) = tpAndConsumer - if (incomingQueue.size < fetchThreshold) { + if (incomingQueue.size >= fetchThreshold) { + false + } else { + consumers.foldLeft(false) { (hasNext, tpAndConsumer) => + val (_, consumer) = tpAndConsumer if (consumer.hasNext) { incomingQueue.put(consumer.next()) true } else { hasNext } - } else { - true } } }