Skip to content

Commit cca3e55

Browse files
committed
skip putting already sent messages into staging buffer to avoid session memory leaking
1 parent b915fb7 commit cca3e55

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

bifromq-inbox/bifromq-inbox-client/src/main/java/com/baidu/bifromq/inbox/client/InboxReader.java

-3
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@ class InboxReader implements IInboxClient.IInboxReader {
2424
private final long incarnation;
2525
private final InboxFetchPipeline ppln;
2626
private final long sessionId = HLC.INST.get();
27-
private int latestBufferCapacity = 100;
2827
private volatile long lastFetchQoS0Seq = -1;
2928
private volatile long lastFetchSendBufferSeq = -1;
30-
private volatile long lastFetchQoS2Seq = -1;
3129

3230
public InboxReader(String inboxId,
3331
long incarnation,
@@ -54,7 +52,6 @@ public void fetch(Consumer<Fetched> consumer) {
5452

5553
@Override
5654
public void hint(int bufferCapacity) {
57-
latestBufferCapacity = bufferCapacity;
5855
try {
5956
ppln.hint(sessionId, inboxId, incarnation, bufferCapacity, lastFetchQoS0Seq, lastFetchSendBufferSeq);
6057
} catch (Throwable e) {

bifromq-mqtt/bifromq-mqtt-server/src/main/java/com/baidu/bifromq/mqtt/handler/MQTTPersistentSessionHandler.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ public final void channelInactive(ChannelHandlerContext ctx) {
195195
memUsage.addAndGet(-estBaseMemSize());
196196
int remainInboxSize =
197197
stagingBuffer.values().stream().reduce(0, (acc, msg) -> acc + msg.estBytes(), Integer::sum);
198-
memUsage.addAndGet(-remainInboxSize);
198+
if (remainInboxSize > 0) {
199+
memUsage.addAndGet(-remainInboxSize);
200+
}
199201
ctx.fireChannelInactive();
200202
}
201203

@@ -502,6 +504,10 @@ private void pubQoS0Message(InboxMessage inboxMsg) {
502504
}
503505

504506
private void pubBufferedMessage(InboxMessage inboxMsg) {
507+
if (inboxMsg.getSeq() < nextSendSeq) {
508+
// do not buffer message that has been sent
509+
return;
510+
}
505511
String topicFilter = inboxMsg.getTopicFilter();
506512
TopicFilterOption option = inboxMsg.getOption();
507513
addFgTask(authProvider.checkPermission(clientInfo(), buildSubAction(topicFilter, option.getQos())))
@@ -515,8 +521,9 @@ private void pubBufferedMessage(InboxMessage inboxMsg) {
515521
tenantMeter.timer(msg.qos() == AT_LEAST_ONCE ? MqttQoS1InternalLatency : MqttQoS2InternalLatency)
516522
.record(HLC.INST.getPhysical() - message.getTimestamp(), TimeUnit.MILLISECONDS);
517523
SubMessage prev = stagingBuffer.put(inboxMsg.getSeq(), msg);
518-
assert prev == null;
519-
memUsage.addAndGet(msg.estBytes());
524+
if (prev == null) {
525+
memUsage.addAndGet(msg.estBytes());
526+
}
520527
});
521528
}
522529

0 commit comments

Comments
 (0)