Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}

if (partitionMetadata->getPartitions() > 0) {
LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
callback(ResultOperationNotSupported, Reader());
return;
}

ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(),
partitionMetadata->getPartitions(), conf,
getListenerExecutorProvider()->get(), callback);
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
ConsumerImplBasePtr consumer = reader->getConsumer();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
Expand Down
2 changes: 1 addition & 1 deletion lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;

virtual void disconnectConsumer();
Result fetchSingleMessageFromBroker(Message& msg);
Expand All @@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase {
virtual void redeliverMessages(const std::set<MessageId>& messageIds);

virtual bool isReadCompacted();
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
void beforeConnectionChange(ClientConnection& cnx) override;

protected:
Expand Down
2 changes: 2 additions & 0 deletions lib/ConsumerImplBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "HandlerBase.h"

namespace pulsar {
typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
class ConsumerImplBase;
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
class OpBatchReceive {
Expand Down Expand Up @@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
virtual uint64_t getNumberOfConnectedConsumer() = 0;
// overrided methods from HandlerBase
virtual const std::string& getName() const override = 0;
virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;

protected:
// overrided methods from HandlerBase
Expand Down
57 changes: 50 additions & 7 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ using namespace pulsar;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
int numPartitions, const std::string& subscriptionName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
LookupServicePtr lookupServicePtr,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
lookupServicePtr) {
lookupServicePtr, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}

MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr)
LookupServicePtr lookupServicePtr,
const Commands::SubscriptionMode subscriptionMode,
boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
Expand All @@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
messageListener_(conf.getMessageListener()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
topics_(topics) {
topics_(topics),
subscriptionMode_(subscriptionMode),
startMessageId_(startMessageId) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
Expand Down Expand Up @@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor, true,
NonPartitioned);
NonPartitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand All @@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
std::string topicPartitionName = topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor,
true, Partitioned);
true, Partitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
Expand Down Expand Up @@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
}

void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
callback(ResultOperationNotSupported);
msgId.getTopicName();
auto optConsumer = consumers_.find(msgId.getTopicName());
if (optConsumer) {
unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
}
}

void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
Expand Down Expand Up @@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
partitionsUpdateTimer_->cancel(ec);
}
}

void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
if (incomingMessagesSize_ > 0) {
callback(ResultOk, true);
return;
}

auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
auto self = get_shared_this_ptr();

consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
consumer->hasMessageAvailableAsync(
[self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
if (result != ResultOk) {
LOG_ERROR("Filed when acknowledge list: " << result);
// set needCallBack is -1 to avoid repeated callback.
needCallBack->store(-1);
callback(result, false);
return;
}

if (hasMsg) {
hasMessageAvailable->store(hasMsg);
}

if (--(*needCallBack) == 0) {
callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
}
});
});
}
14 changes: 12 additions & 2 deletions lib/MultiTopicsConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "BlockingQueue.h"
#include "Commands.h"
#include "ConsumerImplBase.h"
#include "Future.h"
#include "Latch.h"
Expand Down Expand Up @@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
LookupServicePtr lookupServicePtr);
LookupServicePtr lookupServicePtr,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);

MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
boost::optional<MessageId> startMessageId = boost::none);

~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
Expand Down Expand Up @@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;

void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
Expand Down Expand Up @@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;
const Commands::SubscriptionMode subscriptionMode_;
boost::optional<MessageId> startMessageId_;

/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
Expand Down Expand Up @@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {

FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};

typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
Expand Down
29 changes: 22 additions & 7 deletions lib/ReaderImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
#include "MultiTopicsConsumerImpl.h"
#include "TopicName.h"

namespace pulsar {
Expand All @@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader;

static ResultCallback emptyCallback;

ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
: topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
ReaderCallback readerCreatedCallback)
: topic_(topic),
partitions_(partitions),
client_(client),
readerConf_(conf),
readerCreatedCallback_(readerCreatedCallback) {}

void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
Expand Down Expand Up @@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId,
test::consumerConfigOfReader = consumerConf.clone();
}

consumer_ = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
if (partitions_ > 0) {
auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId);
consumer_ = consumerImpl;
} else {
auto consumerImpl = std::make_shared<ConsumerImpl>(
client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
startMessageId);
consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
consumer_ = consumerImpl;
}
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
Expand Down
10 changes: 6 additions & 4 deletions lib/ReaderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;

class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
public:
ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
ReaderCallback readerCreatedCallback);

void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);

Expand All @@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>

Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();

ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }

void hasMessageAvailableAsync(HasMessageAvailableCallback callback);

Expand All @@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
void acknowledgeIfNecessary(Result result, const Message& msg);

std::string topic_;
int partitions_;
ClientImplWeakPtr client_;
ReaderConfiguration readerConf_;
ConsumerImplPtr consumer_;
ConsumerImplBasePtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
};
Expand Down
1 change: 1 addition & 0 deletions lib/UnAckedMessageTrackerEnabled.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};
} // namespace pulsar

Expand Down
47 changes: 47 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) {
ASSERT_EQ(0, result.size());
}

TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) {
Client client(lookupUrl);

const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr));
const std::string subName = "sub";

int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions",
std::to_string(2));
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;

Consumer consumer;
ConsumerConfiguration consumerConfiguration;
consumerConfiguration.setUnAckedMessagesTimeoutMs(10000);
ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));

Producer producer;
ProducerConfiguration producerConfiguration;
producerConfiguration.setBatchingEnabled(false);
producerConfiguration.setPartitionsRoutingMode(
ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));

const int numMessages = 100;
for (int i = 0; i < numMessages; ++i) {
Message msg = MessageBuilder().setContent(std::to_string(i)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}

Message msg;
for (int i = 0; i < numMessages; i++) {
ASSERT_EQ(ResultOk, consumer.receive(msg));
// The last message of each partition topic be ACK
if (i >= numMessages - 2) {
consumer.acknowledgeCumulative(msg.getMessageId());
}
}
ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000));

// Assert that there is no message in the tracker.
auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
auto tracker =
static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImpl->unAckedMessageTrackerPtr_.get());
ASSERT_EQ(0, tracker->size());

client.close();
}

TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;

Expand Down
2 changes: 1 addition & 1 deletion tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class PulsarFriend {
}

static ConsumerImplPtr getConsumer(Reader reader) {
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer());
}

static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
Expand Down
Loading