diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc index e5df421a..289bd343 100644 --- a/lib/ConsumerImpl.cc +++ b/lib/ConsumerImpl.cc @@ -292,11 +292,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) { Result handleResult = ResultOk; - static bool firstTime = true; if (result == ResultOk) { - if (firstTime) { - firstTime = false; - } LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString()); { Lock lock(mutex_); @@ -313,12 +309,10 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result } LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize()); - if (consumerTopicType_ == NonPartitioned || !firstTime) { - if (config_.getReceiverQueueSize() != 0) { - sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize()); - } else if (messageListener_) { - sendFlowPermitsToBroker(cnx, 1); - } + if (config_.getReceiverQueueSize() != 0) { + sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize()); + } else if (messageListener_) { + sendFlowPermitsToBroker(cnx, 1); } consumerCreatedPromise_.setValue(get_shared_this_ptr()); } else { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index e95a9ac4..dddade5c 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -149,6 +149,11 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c if (state_.compare_exchange_strong(state, Ready)) { LOG_INFO("Successfully Subscribed to Topics"); multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr()); + // Now all child topics are successfully subscribed, start messageListeners + if (messageListener_ && !conf_.isStartPaused()) { + LOG_INFO("Start messageListeners"); + resumeMessageListener(); + } } else { LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result); // unsubscribed all of the successfully subscribed partitioned consumers @@ -205,6 +210,11 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN ConsumerSubResultPromisePtr topicSubResultPromise) { std::shared_ptr consumer; ConsumerConfiguration config = conf_.clone(); + // Pause messageListener until all child topics are subscribed. + // Otherwise messages may be acked before the parent consumer gets "Ready", causing ack failures. + if (messageListener_) { + config.setStartPaused(true); + } auto client = client_.lock(); if (!client) { topicSubResultPromise->setFailed(ResultAlreadyClosed);