Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spring-cloud-starter-stream-rocketmq当多集群,然后大量Topic的问题 #3533

Closed
liuxuzxx opened this issue Dec 12, 2023 · 25 comments
Closed
Labels

Comments

@liuxuzxx
Copy link

Which Component
spring-cloud-starter-stream-rocketmq组件

Describe what problem you have encountered
现在遇到了一个进退两难的问题。
场景:有个服务msg-hub需要使用spring-cloud-stream连接三个RocketMQ集群,然后随着日积月累,会核每个集群大概400个Topic产生连接,看到关于和RocketMQ的生产者的线程数达到: 2000多个,在容器内出现OOM。目前排查到的原因如下:

  1. 当使用spring-cloud-stream的StreamBridge通过send方法发送消息给某个RocketMQ集群的时候,StreamBridge会使用: binder(集群名字) + ":" + Topic 作为维度建立一个MessageChannel对象
  2. MessageChannel对象是spring-cloud-stream对多类型MQ的一个客户端的封装,向下具体化为RocketMQ的就是RocketMQProducerMessageHandler对象
  3. RocketMQProducerMessageHandler对象包含一个defaultMQProducer对象(DefaultMQProducer类型)
  4. 每个defaultMQProducer会创建一个新的MQClientInstance对象,原因是:clientId每次都不一样,这个不一样出在: instanceName这块上
  5. RocketMQProduceFactory在初始化defaultMQProducer的时候,会执行如下的代码设置instanceName这个属性
producer.setInstanceName(
				RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));  //以Topic为维度设置instanceName

总体一句话:每个Topic对应一个MessageChannel对象,对应一个defaultMQProducer对象,对应一个新的MQClientInstance对象(这个最大的问题是,会产生10个线程,4个是业务发送线程,6个是其他的配套线程)

Describe what information you have read
spring-cloud-stream 和spring-cloud-starter-stream-rocketmq创建MessageChannel,以及对应DefaultMQProducer的创建过程

期望:
不知道这里面是否可以通过一些钩子来决定是否每次都需要创建MQClientInstance对象,也就是说让MQClientInstance对象复用?还望指教

@liuxuzxx
Copy link
Author

看了下spring-cloud-stream的设计思想是以Topic为维度,但是在现行版本当中(包括最新版本的v4.1.0)还是有个bug。就是channelCahce超过dynamic-destination-cache-size的时候删除的问题。就是它放到channelCache和bindingService的key不一样,一个是以binder(MQ集群的名字) + ":" + Topic作为key,另一个以Topic作为key,导致了,一个能清理,另外一个真正能关闭的无法执行。

就算是可以执行关闭,但是每次都新建MQClientIntance的动作很耗时,无法做到复用,会影响性能

@liuxuzxx
Copy link
Author

@fangjian0423 请教下,这个问题怎么解决。现在有个服务的线程已经超过了1700个线程了

@liuxuzxx
Copy link
Author

@chickenlj 有空帮忙看下这个问题,多谢

@liuxuzxx liuxuzxx mentioned this issue Dec 22, 2023
@axeon
Copy link

axeon commented Jan 4, 2024

alibaba的应用全都有这个问题,乱开线程,丝毫不考虑cs切换成本。
nacos随便起一个实例,就开了1000多线程,也不知道他要干啥。

Copy link

github-actions bot commented Feb 3, 2024

This issue has been open 30 days with no activity. This will be closed in 7 days.

@github-actions github-actions bot added the stale label Feb 3, 2024
@liuxuzxx
Copy link
Author

liuxuzxx commented Feb 5, 2024

目前找到了一个方案,就是使用bytebuddy这个依赖jar,拦截ProduerFacotry构造Instance对象的方法,修改传入的topic信息,可以根据自己的需要进行修改。也是不得已而为之的下策!

@github-actions github-actions bot removed the stale label Feb 5, 2024
Copy link

github-actions bot commented Mar 6, 2024

This issue has been open 30 days with no activity. This will be closed in 7 days.

@github-actions github-actions bot added the stale label Mar 6, 2024
@liuxuzxx
Copy link
Author

找到一种方案来处理,使用类覆盖的形式来处理

@github-actions github-actions bot removed the stale label Mar 14, 2024
Copy link

This issue has been open 30 days with no activity. This will be closed in 7 days.

@github-actions github-actions bot added the stale label Apr 13, 2024
@liuxuzxx
Copy link
Author

看了下,在新版本已经解决这个问题了:

<spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version>

@liuxuzxx
Copy link
Author

但是在升级的时候,要注意,我们碰到一个spring-cloud-stream的问题,就是:处理byte的时候有点问题

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

@liuxuzxx 你好,请问下,使用spring-cloud-stream连接三个RocketMQ集群,这个是怎样配置的,官方没有这样的例子,用spring cloud stream的配置方式也并不生效,我用的spring cloud alibaba版本是2022.0.0.0

@liuxuzxx
Copy link
Author

@liuxuzxx 你好,请问下,使用spring-cloud-stream连接三个RocketMQ集群,这个是怎样配置的,官方没有这样的例子,用spring cloud stream的配置方式也并不生效,我用的spring cloud alibaba版本是2022.0.0.0

spring:
  cloud:
    function:
      definition: commandConsumer;channelSwitchConsumer;timingMessageConsumer;eventConsumer;immediateMessageSwitchConsumer
    stream:
      default-binder: nansha #指定默认的binder
      dynamicDestinationCacheSize: 200 #stream底层有个MessageChannel对象缓存个数,默认是10,建议设置为1000,就是一个Topic会占用一个
      bindings:
        commandConsumer-in-0:
          destination:  Topic_1
          group: topic-1-consumer-group
          content-type: application/json
          consumer:
            concurrency: 4
          binder: nansha #指定binder,下面有配置多个binder,每个binder代表一个RocketMQ集群
        immediateMessageSwitchConsumer-in-0:
          destination: Topic_2
          group: topic-1-consumer-group
          content-type: application/json
          consumer:
            concurrency: 1
          binder: cloud #指定binder,不指定默认取 default-binder这个配置
      binders:
        nansha:
          type: rocketmq
          environment:
            spring:
              cloud:
                stream:
                  rocketmq:
                    binder:
                      name-server: 172.16.0.10:9876
                      group: nansha-group
                      unitName: nansha
                  dynamicDestinationCacheSize: 200
        cloud:
          type: rocketmq
          environment:
            spring:
              cloud:
                stream:
                  rocketmq:
                    binder:
                      name-server: 172.16.0.11:9876;172.16.0.12:9876
                      group: cloud-group
                      unitName: cloud
                  dynamicDestinationCacheSize: 200

@liuxuzxx
Copy link
Author

注意:使用StreamBinder投递消息的时候,一定要指定binder

 streamBridge.send("Topic的名字", "binder的名字:nansha/cloud", 消息对象);

如果不指定binder的参数,就默认使用default-binder,但是在两个集群(nansha/cloud)如果存在Topic名字一致的情况,在投递消息的时候会报错,所以,尽可能所有地方都明确使用指定binder,不要使用default-binder这个配置,否则会出现各种问题

@liuxuzxx
Copy link
Author

目前已经发现的问题:

  1. 当Topic很多的时候(例如超过3000个),服务启动的时候,即使你的服务只是处理2-3个Topic,在stream初始化这块很慢,大概耗时30-60s不等,目前没有找到解决方案,请慎重使用
  2. 多binder(也就是多集群)的情况下,不要使用默认的default-binder配置,即使配置了,代码中和配置中也都要明确指定是哪个binder,否则会出现各种奇怪的问题,然后需要去看源码,不好排查
  3. dynamicDestinationCacheSize这个数值配置大点,默认是10个,如果超过10个的时候,会出现报错,所以这个要配置大点,我们自己配置了:int最大值:21亿那个。不会占用多少内存,我们有个服务处理Topic数量(超过30个),没什么问题。一般一个服务不会说处理超过50个Topic吧。如果处理的Topic超过100个这种,尽可能使用原生的rocketmq/kafka的依赖包,不使用stream

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

非常感谢!我试一下

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

spring.binders有做配置吗?
配置文件中对binder只识别rocketmq,像我这样的配置binder,rocketmq1显示错误

    stream:
      function:
        definition: testFunction
      binders:
        rocketmq:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
        rocketmq1:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.2:9876
              bindings:
                test1-out-0:
                  producer:
                    group: test1-producer-group
                    tags: test1
#        bindings
      default-binder: rocketmq
      bindings:
        test1-out-0:
          destination: test1-topic
          group: test1-producer-group 
          content-type: application/json
          binder: rocketmq1   #这里会报错

还有我启动的时候报这个错误

java.lang.IllegalStateException: rocketmq_context has not been refreshed yet
	at org.springframework.context.support.AbstractApplicationContext.assertBeanFactoryActive(AbstractApplicationContext.java:1117) ~[spring-context-6.0.4.jar:6.0.4]
	at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1275) ~[spring-context-6.0.4.jar:6.0.4]

@liuxuzxx
Copy link
Author

spring.binders有做配置吗? 配置文件中对binder只识别rocketmq,像我这样的配置binder,rocketmq1显示错误

    stream:
      function:
        definition: testFunction
      binders:
        rocketmq:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
        rocketmq1:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.2:9876
              bindings:
                test1-out-0:
                  producer:
                    group: test1-producer-group
                    tags: test1
#        bindings
      default-binder: rocketmq
      bindings:
        test1-out-0:
          destination: test1-topic
          group: test1-producer-group 
          content-type: application/json
          binder: rocketmq1   #这里会报错

还有我启动的时候报这个错误

java.lang.IllegalStateException: rocketmq_context has not been refreshed yet
	at org.springframework.context.support.AbstractApplicationContext.assertBeanFactoryActive(AbstractApplicationContext.java:1117) ~[spring-context-6.0.4.jar:6.0.4]
	at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1275) ~[spring-context-6.0.4.jar:6.0.4]

这个配置怪怪的,其实spring.cloud.stream就两块配置:

  1. spring.cloud.stream.bindings是配置消费者,生产者的地方
  2. spring.cloud.stream.binders是配置连接MQ集群的地方,也就是多MQ集群的配置信息

然后bindings这部分配置的消费者和生产者需要指定是哪个集群的,所以需要配置binders,就是这个关系。

看你的配置,好像两个放到一起配置了就是binders下面配置了bindings. 不过这种配置我没有实验过

@liuxuzxx
Copy link
Author

这个项目里面,基本上覆盖了所有的场景.

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

好的,我去看下,其实上面的配置风格是按照rocketmq的样式写的,如果要配置rocketmq特有属性比如messageModel,subscription,还是需要在stream.binder.rocketmq.environment里面再配置bindings ,在bindings下指定binder是可以工作的,我发现前面被IDE误导了

spring:
  cloud:
    stream:
      function:
        definition: test
      binders:
        rocketmq:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
          rocketmq1:
            type: rocketmq
            default-candidate: false
            environment:
              spring.cloud.stream.rocketmq:
                binder:
                  name-server: 192.168.1.2:9876
                bindings:
                  test-in-0:
                    consumer:
                      messageModel: BROADCASTING
                      subscription: testTag
      bindings:
        test-in-0:
          destination: test_in_topic
          group: test-in-consumer-group
          content-type: application/json
          binder: rocketmq1                       #现在这里依然报错,Invalid value 'rocketmq1', 但是可以工作。。。。

现在工作正常了,虽然启动还是报错

java.lang.IllegalStateException: rocketmq_context has not been refreshed yet
java.lang.IllegalStateException: rocketmq1_context has not been refreshed yet

@liuxuzxx
Copy link
Author

好的,我去看下,其实上面的配置风格是按照rocketmq的样式写的,如果要配置rocketmq特有属性比如messageModel,subscription,还是需要在stream.binder.rocketmq.environment里面再配置bindings ,在bindings下指定binder是可以工作的,我发现前面被IDE误导了

spring:
  cloud:
    stream:
      function:
        definition: test
      binders:
        rocketmq:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
          rocketmq1:
            type: rocketmq
            default-candidate: false
            environment:
              spring.cloud.stream.rocketmq:
                binder:
                  name-server: 192.168.1.2:9876
                bindings:
                  test-in-0:
                    consumer:
                      messageModel: BROADCASTING
                      subscription: testTag
      bindings:
        test-in-0:
          destination: test_in_topic
          group: test-in-consumer-group
          content-type: application/json
          binder: rocketmq1                       #现在这里依然报错,Invalid value 'rocketmq1', 但是可以工作。。。。

现在工作正常了,虽然启动还是报错

java.lang.IllegalStateException: rocketmq_context has not been refreshed yet
java.lang.IllegalStateException: rocketmq1_context has not been refreshed yet

你说这个是吧,其实也是可以只在spring.cloud.stream.bindings下面配置的,例如:我们配置了如下:

spring:
  cloud:
    stream:
      bindings:
        commandConsumer-in-0:
          destination: Topic名iz
          group: 消费者组
          content-type: application/json
          consumer:
            concurrency: 4
            consumerFromWhere: CONSUME_FROM_FIRST_OFFSET
            pullBatchSize: 10

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

这样看起来更简洁一些,我把配置移过来试下

@lionhuo
Copy link

lionhuo commented Dec 13, 2024

如果rocketmq开启了acl,producer这样配置是不生效的, 还有cosumer的messageModel和subscription这里也是不生效的

spring:
  cloud:
    stream:
      bindings:
        test-out-0:
          destination: test_out_topic
          group: test-producer-group
          content-type: application/json
          producer:
            accessKey: test-name       #不生效
            secretKey: test-password  #不生效
          binder: test-binder
       test-in-0:
          destination: test_in_topic
          group: test-consumer-group
          content-type: application/json
          consumer:
            messageModel: BROADCASTING  #不生效
            subscription: testLive  #不生效
          binder: test-binder

但是这样是可以生效的,看来有些特有配置属性还是得配置在rocketmq下的bindings里

spring:
  cloud:
    stream:
      function:
        definition: test
      binders:
        test-binder:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
                    accessKey: test-username #有效
                    secretKey: test-password  #有效
                test-in-0:
                  consumer:
                    messageModel: BROADCASTING  #有效
                    subscription: testLive  #有效

@liuxuzxx
Copy link
Author

如果rocketmq开启了acl,producer这样配置是不生效的, 还有cosumer的messageModel和subscription这里也是不生效的

spring:
  cloud:
    stream:
      bindings:
        test-out-0:
          destination: test_out_topic
          group: test-producer-group
          content-type: application/json
          producer:
            accessKey: test-name       #不生效
            secretKey: test-password  #不生效
          binder: test-binder
       test-in-0:
          destination: test_in_topic
          group: test-consumer-group
          content-type: application/json
          consumer:
            messageModel: BROADCASTING  #不生效
            subscription: testLive  #不生效
          binder: test-binder

但是这样是可以生效的,看来有些特有配置属性还是得配置在rocketmq下的bindings里

spring:
  cloud:
    stream:
      function:
        definition: test
      binders:
        test-binder:
          type: rocketmq
          default-candidate: false
          environment:
            spring.cloud.stream.rocketmq:
              binder:
                name-server: 192.168.1.1:9876
              bindings:
                test-out-0:
                  producer:
                    group: test-producer-group
                    tags: test
                    accessKey: test-username #有效
                    secretKey: test-password  #有效
                test-in-0:
                  consumer:
                    messageModel: BROADCASTING  #有效
                    subscription: testLive  #有效

学习了,我们没有用的这么多。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants