Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,7 @@ void ConsumerImpl::sendFlowPermitsToBroker(const ClientConnectionPtr& cnx, int n
Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result result) {
Result handleResult = ResultOk;

static bool firstTime = true;
if (result == ResultOk) {
if (firstTime) {
firstTime = false;
}
LOG_INFO(getName() << "Created consumer on broker " << cnx->cnxString());
{
Lock lock(mutex_);
Expand All @@ -313,12 +309,10 @@ Result ConsumerImpl::handleCreateConsumer(const ClientConnectionPtr& cnx, Result
}

LOG_DEBUG(getName() << "Send initial flow permits: " << config_.getReceiverQueueSize());
if (consumerTopicType_ == NonPartitioned || !firstTime) {
if (config_.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
sendFlowPermitsToBroker(cnx, 1);
}
if (config_.getReceiverQueueSize() != 0) {
sendFlowPermitsToBroker(cnx, config_.getReceiverQueueSize());
} else if (messageListener_) {
sendFlowPermitsToBroker(cnx, 1);
}
consumerCreatedPromise_.setValue(get_shared_this_ptr());
} else {
Expand Down
10 changes: 10 additions & 0 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ void MultiTopicsConsumerImpl::handleOneTopicSubscribed(Result result, Consumer c
if (state_.compare_exchange_strong(state, Ready)) {
LOG_INFO("Successfully Subscribed to Topics");
multiTopicsConsumerCreatedPromise_.setValue(get_shared_this_ptr());
// Now all child topics are successfully subscribed, start messageListeners
if (messageListener_ && !conf_.isStartPaused()) {
LOG_INFO("Start messageListeners");
resumeMessageListener();
}
} else {
LOG_ERROR("Unable to create Consumer - " << consumerStr_ << " Error - " << result);
// unsubscribed all of the successfully subscribed partitioned consumers
Expand Down Expand Up @@ -205,6 +210,11 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
ConsumerSubResultPromisePtr topicSubResultPromise) {
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config = conf_.clone();
// Pause messageListener until all child topics are subscribed.
// Otherwise messages may be acked before the parent consumer gets "Ready", causing ack failures.
if (messageListener_) {
config.setStartPaused(true);
}
auto client = client_.lock();
if (!client) {
topicSubResultPromise->setFailed(ResultAlreadyClosed);
Expand Down