Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bifromq direct memory leak #118

Closed
masterOcean opened this issue Feb 17, 2025 · 6 comments
Closed

bifromq direct memory leak #118

masterOcean opened this issue Feb 17, 2025 · 6 comments

Comments

@masterOcean
Copy link

⚠️ bifromq derect memory leak

3 个 2c4g 的 节点组成 bifromq 测试集群 test,

  1. 6个客户端连接到集群,共享订阅 $share/group1/test/#;
  2. kill -9 命令杀掉一个节点后再重启加入集群中(模拟节点异常宕机后重启)
  3. 重启节点上的客户端重连成功,并重新订阅,其 clientId 不变;
  4. 1000个客户端压测另一个 bifromq 集群 old,每分钟发消息到主题 test/${clientId} ,每 5分钟断开连接一次,重连时clientId 不变,cleanSession = true
  5. 使用 tcpcopy 复制 old 集群流量到 test 集群,大概压测半小时后直接内存溢出,报错:
[ConditionalRejectHandler.java:39] Reject connection due to HighDirectMemoryUsage: remote=/62.135.200.5:25374

netty 检测日志如下:

Recent access records: 
#1:
	Hint: 'MqttPreludeHandler' will handle the message from this point.
	io.netty.handler.codec.mqtt.MqttPublishMessage.touch(MqttPublishMessage.java:95)
	io.netty.handler.codec.mqtt.MqttPublishMessage.touch(MqttPublishMessage.java:26)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	com.baidu.bifromq.mqtt.handler.MQTTMessageDebounceHandler.channelRead(MQTTMessageDebounceHandler.java:61)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
#2:
	Hint: 'MQTTMessageDebounceHandler' will handle the message from this point.
	io.netty.handler.codec.mqtt.MqttPublishMessage.touch(MqttPublishMessage.java:95)
	io.netty.handler.codec.mqtt.MqttPublishMessage.touch(MqttPublishMessage.java:26)
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
Created at:
	io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:144)
	io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
	io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:108)
	io.netty.handler.codec.ReplayingDecoderByteBuf.readRetainedSlice(ReplayingDecoderByteBuf.java:588)
	io.netty.handler.codec.mqtt.MqttDecoder.decodePublishPayload(MqttDecoder.java:654)
	io.netty.handler.codec.mqtt.MqttDecoder.decodePayload(MqttDecoder.java:522)
	io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:114)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
2024-12-09 14:26:45.840 ERROR   [cluster-tcp-transport] --- io.netty.util.ResourceLeakDetector      [ResourceLeakDetector.java:327] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
#2:
	io.netty.buffer.AdvancedLeakAwareByteBuf.readUnsignedByte(AdvancedLeakAwareByteBuf.java:407)
	io.netty.handler.codec.ReplayingDecoderByteBuf.readUnsignedByte(ReplayingDecoderByteBuf.java:522)
	io.netty.handler.codec.mqtt.MqttDecoder.decodeMsbLsb(MqttDecoder.java:707)
	io.netty.handler.codec.mqtt.MqttDecoder.decodeString(MqttDecoder.java:663)
	io.netty.handler.codec.mqtt.MqttDecoder.decodeString(MqttDecoder.java:659)
	io.netty.handler.codec.mqtt.MqttDecoder.decodePublishVariableHeader(MqttDecoder.java:452)
	io.netty.handler.codec.mqtt.MqttDecoder.decodeVariableHeader(MqttDecoder.java:273)
	io.netty.handler.codec.mqtt.MqttDecoder.decode(MqttDecoder.java:98)
	io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
	io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:366)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
#3:
	Hint: 'io.netty.handler.codec.mqtt.MqttDecoder' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.handler.traffic.AbstractTrafficShapingHandler.channelRead(AbstractTrafficShapingHandler.java:506)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
#4:
	Hint: 'trafficShaper' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
#5:
	Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point.
	io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:417)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:407)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
	io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
	io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:120)
	io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:785)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	java.base/java.lang.Thread.run(Thread.java:842)

该测试的目的是复制生产流量到线下集群,验证 bifromq 的稳定性。故障在 3.2.1 和3.3.1 都复现了。但压测工具直连 test 集群(不使用 tcpcopy)就不会报错。

由于节点异常宕机重启后 grpc serverId 变更,grpc 会找不到server,debug.log 打印如下:

java.util.concurrent.CompletionException: com.baidu.bifromq.baserpc.exception.ServerNotFoundException: Server not found: 18f32a8a-f8d4-4d03-a214-3ba4e282b0e9/1183403461
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
        at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674)
        at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
        at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
        at com.baidu.bifromq.mqtt.inbox.MqttBrokerClient$DeliveryPipeline.deliver(MqttBrokerClient.java:130)
        at com.baidu.bifromq.plugin.subbroker.MonitoredSubBroker$MonitoredDeliverer.deliver(MonitoredSubBroker.java:73)
        at com.baidu.bifromq.deliverer.MessageDeliverer$DeliveryCallBatcher$DeliveryBatchCall.execute(MessageDeliverer.java:103)
        at com.baidu.bifromq.basescheduler.Batcher.batchAndEmit(Batcher.java:170)
        at com.baidu.bifromq.basescheduler.Batcher.trigger(Batcher.java:139)
        at com.baidu.bifromq.basescheduler.Batcher.submit(Batcher.java:111)
        at com.baidu.bifromq.basescheduler.BatchCallScheduler.lambda$schedule$2(BatchCallScheduler.java:119)
        at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
        at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
        at com.baidu.bifromq.basescheduler.BatchCallScheduler.schedule(BatchCallScheduler.java:113)
        at com.baidu.bifromq.dist.worker.DeliverExecutor.send(DeliverExecutor.java:96)
        at com.baidu.bifromq.dist.worker.DeliverExecutor.submit(DeliverExecutor.java:63)
        at com.baidu.bifromq.dist.worker.DeliverExecutorGroup.send(DeliverExecutorGroup.java:203)
        at com.baidu.bifromq.dist.worker.DeliverExecutorGroup.prepareSend(DeliverExecutorGroup.java:169)
        at com.baidu.bifromq.dist.worker.DeliverExecutorGroup.submit(DeliverExecutorGroup.java:104)
        at com.baidu.bifromq.dist.worker.DistWorkerCoProc.lambda$batchDist$21(DistWorkerCoProc.java:406)
        at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684)
        at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
        at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2168)
        at com.baidu.bifromq.dist.worker.DistWorkerCoProc.batchDist(DistWorkerCoProc.java:405)
        at com.baidu.bifromq.dist.worker.DistWorkerCoProc.query(DistWorkerCoProc.java:129)
        at com.baidu.bifromq.basekv.store.range.KVRangeQueryRunner.lambda$queryCoProc$4(KVRangeQueryRunner.java:82)
        at com.baidu.bifromq.basekv.store.range.KVRangeQueryRunner.doQuery(KVRangeQueryRunner.java:157)
        at com.baidu.bifromq.basekv.store.range.KVRangeQueryRunner.lambda$submit$9(KVRangeQueryRunner.java:125)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
        at com.baidu.bifromq.basekv.store.range.KVRangeQueryRunner.submit(KVRangeQueryRunner.java:135)

tcpcopy 测试期间,test 集群 warn.log 会打印如下:

2025-02-17 10:04:21.366  WARN   [mqtt-worker-elg-3] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:199] ctx: ChannelHandlerContext(MqttPreludeHandler, [id: 0x91739793, L:/10.91.164.15:1883 - R:/62.135.200.220:18144]), cause:
io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
2025-02-17 10:04:21.585  WARN   [mqtt-worker-elg-3] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:199] ctx: ChannelHandlerContext(MqttPreludeHandler, [id: 0xdd102bc8, L:/10.91.164.15:1883 - R:/62.135.200.204:56380]), cause:
io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
2025-02-17 10:04:21.966  WARN   [mqtt-worker-elg] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:199] ctx: ChannelHandlerContext(MqttPreludeHandler, [id: 0x853caaa6, L:/10.91.164.15:1883 - R:/62.135.200.164:37079]), cause:
io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
2025-02-17 10:04:22.098  WARN   [mqtt-worker-elg-6] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:199] ctx: ChannelHandlerContext(MqttPreludeHandler, [id: 0x159a7a8a, L:/10.91.164.15:1883 - R:/62.135.200.71:40581]), cause:
io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
2025-02-17 10:04:22.512  WARN   [mqtt-worker-elg-1] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:170] First packet must be mqtt connect message: remote=/62.135.200.100:37086
2025-02-17 10:04:22.663  WARN   [mqtt-worker-elg-2] --- c.b.b.mqtt.handler.MQTTPreludeHandler   [MQTTPreludeHandler.java:170] First packet must be mqtt connect message: remote=/62.135.200.128:31129

Environment

  • Version: [e.g. 3.2.1, 3.3.1]
  • JVM Version: [e.g. OpenJDK17]
  • Hardware Spec: [2c4g]
  • OS: [腾讯os]
  • The Testing Tools
  • etc

Reproducible Steps

  • PUB Client Parameters
    • MQTT Connection:
      • ClientIdentifier:
      • etc...
    • MQTT Pub:
      • Topic: test/${clientId}
      • QoS: [0]
      • Retain: [false]
  • SUB Client Parameters:
    • MQTT Connection:
      • Clean Session: [true ]
      • ClientIdentifier:
      • etc...
    • MQTT Sub:
      • TopicFilter: $share/group1/test/#
      • QoS: [0]
  • The Load Generated

Publicly Accessible Diagnostic Data(If Reproducible Steps are not available)

  • Log files downloadable link:

  • BifroMQ data downloadable link:

  • Configuration files downloadable link:
    java -Xms1500m -Xmx1500m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=300m -XX:MaxDirectMemorySize=300m -server -XX:MaxInlineLevel=15 -Djava.awt.headless=true -XX:+UnlockExperimentalVMOptions -XX:+UnlockDiagnosticVMOptions -XX:+UseZGC -XX:ZAllocationSpikeTolerance=5 -Xlog:async -Xlog:gc:file=/data2/bifromq/bifromq-3.2.1/bin/../logs/gc-%t.log:time,tid,tags:filecount=5,filesize=50m -XX:+CrashOnOutOfMemoryError -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/data2/bifromq/bifromq-3.2.1/bin/../logs -Dinbox_store_range_voter_count=3 -Dsetting_expire_seconds=60 -Dsetting_provide_init_value=true -DFTB_PLUGIN_CONF=/data2/bifromq/bifromq-3.2.1/bin/../conf/ftb_plugin_conf.yaml -Dio.netty.tryReflectionSetAccessible=true -Dio.netty.allocator.cacheTrimIntervalMillis=5000 --add-opens java.base/java.nio=ALL-UNNAMED -cp /data2/bifromq/bifromq-3.2.1/bin/../lib/* -DLOG_DIR=/data2/bifromq/bifromq-3.2.1/bin/../logs -DCONF_DIR=/data2/bifromq/bifromq-3.2.1/bin/../conf -DDATA_DIR=/data2/bifromq/bifromq-3.2.1/bin/../data -DBIND_ADDR=10.89.144.20 -Dlogback.configurationFile=/data2/bifromq/bifromq-3.2.1/bin/../conf/logback.xml -Dpf4j.pluginsDir=/data2/bifromq/bifromq-3.2.1/bin/../plugins com.baidu.bifromq.starter.StandaloneStarter -c /data2/bifromq/bifromq-3.2.1/bin/../conf/standalone.yml

  • etc

Additional context
Add any other context about the problem here.

@masterOcean
Copy link
Author

netty 内存检测日志:

netty_leak_detect.log

@popduke
Copy link
Member

popduke commented Feb 17, 2025

压力停止后Netty Direct Memory也不释放?

@masterOcean
Copy link
Author

压力停止后Netty Direct Memory也不释放?

是的,不释放。只能重启恢复

@popduke
Copy link
Member

popduke commented Feb 19, 2025

tcpcopy并不保证mqtt会话的完整性,所以会有warning。这个分支上(https://github.com/bifromqio/bifromq/tree/release-v3) 提交了针对这种情况的patch,你可以本地build验证下内存释放的问题是否还存在?

@popduke
Copy link
Member

popduke commented Feb 25, 2025

Include a fix in v3.3.5, verification recommended, re-open the issue if needed.

@popduke popduke closed this as completed Feb 25, 2025
@masterOcean
Copy link
Author

验证通过了。多谢多谢

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants