Skip to content

Commit

Permalink
Support consumer subsctibe multiple topics for pull consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
LeBW committed Dec 24, 2022
1 parent b64c7a6 commit 9bbefe5
Showing 1 changed file with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.springframework.messaging.Message;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
* @author <a href="mailto:[email protected]">Jim</a>
Expand All @@ -61,7 +62,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>

private volatile boolean running;

private final String topic;
private final String name;

private final MessageSelector messageSelector;

Expand All @@ -71,7 +72,7 @@ public class RocketMQMessageSource extends AbstractMessageSource<Object>

public RocketMQMessageSource(String name,
ExtendedConsumerProperties<RocketMQConsumerProperties> extendedConsumerProperties) {
this.topic = name;
this.name = name;
this.messageSelector = RocketMQUtils.getMessageSelector(
extendedConsumerProperties.getExtension().getSubscription());
this.extendedConsumerProperties = extendedConsumerProperties;
Expand All @@ -80,7 +81,7 @@ public RocketMQMessageSource(String name,

@Override
public synchronized void start() {
Instrumentation instrumentation = new Instrumentation(topic, this);
Instrumentation instrumentation = new Instrumentation(name, this);
try {
if (this.isRunning()) {
throw new IllegalStateException(
Expand All @@ -90,14 +91,18 @@ public synchronized void start() {
.initPullConsumer(topic, extendedConsumerProperties);
// This parameter must be 1, otherwise doReceive cannot be handled singly.
// this.consumer.setPullBatchSize(1);
this.consumer.subscribe(topic, messageSelector);
String[] topics = StringUtils.commaDelimitedListToStringArray(name);
for (String topic: topics) {
this.consumer.subscribe(topic, messageSelector);
// register TopicMessageQueueChangeListener for messageQueuesForTopic
this.consumer.registerTopicMessageQueueChangeListener(topic, messageQueuesForTopic::put);
}
this.consumer.setAutoCommit(false);
// register TopicMessageQueueChangeListener for messageQueuesForTopic
consumer.registerTopicMessageQueueChangeListener(topic,
messageQueuesForTopic::put);
this.consumer.start();
// Initialize messageQueuesForTopic immediately
messageQueuesForTopic.put(topic, consumer.fetchMessageQueues(topic));
for (String topic: topics) {
messageQueuesForTopic.put(topic, consumer.fetchMessageQueues(topic));
}
instrumentation.markStartedSuccessfully();
}
catch (MQClientException e) {
Expand Down Expand Up @@ -128,7 +133,9 @@ private MessageQueue acquireCurrentMessageQueue(String topic, int queueId,
@Override
public synchronized void stop() {
if (this.isRunning() && null != consumer) {
consumer.unsubscribe(topic);
for (String topic: StringUtils.commaDelimitedListToStringArray(name)) {
consumer.unsubscribe(topic);
}
consumer.shutdown();
this.running = false;
}
Expand Down

0 comments on commit 9bbefe5

Please sign in to comment.