Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #669] Support RocketMQ as storage of mqtt broker #687

Merged
merged 9 commits into from
Jul 21, 2021

Conversation

maixiaohai
Copy link
Contributor

@maixiaohai maixiaohai commented Mar 9, 2021

What is the purpose of the change

issue #669

Brief changelog

  1. refactor config, add some rmq config
  2. add rmq producer for message publish
  3. add rmq consumer for message subscribe and unsubscribe

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in test module.
  • Run mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle to make sure basic checks pass. Run mvn clean install -DskipITs to make sure unit-test pass. Run mvn clean test-compile failsafe:integration-test to make sure integration-test pass.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@@ -26,6 +26,16 @@ limitations under the License.
<name>rocketmq-iot-bridge ${project.version}</name>

<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tools has depends on client module.

workerGroup = new NioEventLoopGroup(MQTTBridgeConfiguration.threadNumOfWorkerGroup());
this.bridgeConfig = new MqttBridgeConfig();

subscriptionStore = new InMemorySubscriptionStore();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default InMemory subscription?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subscription related with consumer logic. I'll finish it at consumer part.


subscriptionStore = new InMemorySubscriptionStore();
if (bridgeConfig.isEnableRocketMQStore()) {
this.publishProducer = new RocketMQPublishProducer(bridgeConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If disable, where init publishProducer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default store is rmq, but the old design stored in memory(simple mode) could be keeped.
User could choose if turn on the rmq store, developer could use simple mode to test basic feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the memory mode or rmq storage mode be the default? I don't think the default mode is memory mode, which is not a production-available mode. If we could reach a consensus, we should change the enable logic for the default choice.

if (bridgeConfig.isEnableRocketMQStore()) {
messageDispatcher.registerHandler(Message.Type.MQTT_PUBLISH, new MqttPublishMessageHandler(messageStore, publishProducer));
} else {
// TODO: mqtt cluster inner forwarder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see implementation while not too many todo :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cluster mode need management of consumer offset and client, I'll finish it at consumer part.

ChannelFuture channelFuture = serverBootstrap.bind().sync();
channelFuture.channel().closeFuture().sync();
logger.info("start the MQTTServer success.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better print all necesarry configuration when you starting server. Only success or ok is too little significant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix it~

log.error("send msg to rocketMQ failed. clientId:" + client.getId(), e);
}

// TODO: qos1, qos2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qos1 need consumer response.
qos2 means Exactly once, it's difficult to implemet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, we could postpone to support QoS 2

@vongosling
Copy link
Member

Add some necessary tests, making this is a relatively complete commit. I will merge this pr.

@maixiaohai
Copy link
Contributor Author

Add some necessary tests, making this is a relatively complete commit. I will merge this pr.

I'm working on the branch with consumer part.When I finish the first version of consumer, I will add necessary tests~

@maixiaohai
Copy link
Contributor Author

ping~

@vongosling
Copy link
Member

@maixiaohai I will merge it. pls go on :-0

@vongosling vongosling merged commit 3aa8ee5 into apache:mqtt-mesh Jul 21, 2021
@vongosling vongosling linked an issue Jul 21, 2021 that may be closed by this pull request
2 tasks
flyingzhang pushed a commit to flyingzhang/rocketmq-externals that referenced this pull request Oct 28, 2021
…#687)

* Config refactor, add some rocketmq config

* Add rmq publish producer

* Fix NPE start with memory mode, remove useless dependency

* Non-standard mqtt request return directly

* Add rmq subscribe consumer and related pull task, complete mqtt SUBSCRIBE and UNSUBSCRIBE protcol

* Fix subTopic not remove when disconnect

* Fix connection management, fix consume delay to make pull task to queue level

* Fix ut not pass when root topic removed from subsription

* Fix integration test not pass when not use rmq subscribe consumer

Co-authored-by: zhangxu16 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[KEPLER-MISSION] Apache RocketMQ 6.0 subtasks - native mqtt support
2 participants