Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Broker memory leak #22157

Closed
1 of 2 tasks
graysonzeng opened this issue Feb 29, 2024 · 13 comments · Fixed by #22191
Closed
1 of 2 tasks

[Bug] Broker memory leak #22157

graysonzeng opened this issue Feb 29, 2024 · 13 comments · Fixed by #22191
Assignees
Labels
area/broker type/bug The PR fixed a bug or issue reported a bug
Milestone

Comments

@graysonzeng
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Version

v3.1.1

Minimal reproduce step

After running for a period of time, the broker memory will gradually increase and eventually lead to a restart.
企业微信截图_6fc91d9b-5071-41bd-a698-d7c7c8a5b975

After the heap dump, it was found that many ManagedLedgerImpl instances were retained in the memory, and these instances occupied most of the memory.

企业微信截图_a61faea0-4ef0-4488-905f-c91d5275cf37

Common Path To the Accumulation Point:
image

image

  • [ ]

What did you expect to see?

Normal memory GC

What did you see instead?

broker restart

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@dao-jun
Copy link
Member

dao-jun commented Feb 29, 2024

could you please upload the heap dump file?

@graysonzeng
Copy link
Contributor Author

The heap dump has reached 13GB and cannot be uploaded

@dao-jun
Copy link
Member

dao-jun commented Feb 29, 2024

The heap dump has reached 13GB and cannot be uploaded

compress it and upload to cloud driver, such as 百度云? it is very important to locate the root cause

@dao-jun dao-jun added type/bug The PR fixed a bug or issue reported a bug area/broker labels Feb 29, 2024
@dao-jun dao-jun added this to the 3.3.0 milestone Feb 29, 2024
@Technoboy- Technoboy- self-assigned this Feb 29, 2024
@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

The heap dump has reached 13GB and cannot be uploaded

compress it and upload to cloud driver, such as 百度云? it is very important to locate the root cause

@graysonzeng @dao-jun please notice that the heap dump could contain sensitive data. it should be never shared without encryption because of this.
encrypting for a specific recipient with gpg is one possible solution.

@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

@graysonzeng from the screenshots, it looks like the problem is caused by the 6.5 million NonDurableCursorImpl instances.

@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

I'd recommend adding https://github.com/vlsi/mat-calcite-plugin plugin to Eclipse MAT so that you can do SQL queries to the heap dump. Eclipse MAT has OQL support, but that's not so handy as the SQL queries where you can do anything that Calcite supports with SQL.

not useful for this case, but an example of a Calcite query using Eclipse MAT + Calcite plugin:

select clientVersion, count(clientVersion) from org.apache.pulsar.broker.service.ServerCnx group by 1 order by 2 desc

@graysonzeng
Copy link
Contributor Author

image

@lhotari Yes. And I found that these instances are referenced by waitingCursors in the ManagedLedgerImpl instance. At the same time, the Cursor is in the isActive = false state. This indicates that they should be deleted and should not still be retained by waitingCursors.

We used routine load task in starrocks for reader consume, and created and deleted consumers multiple times. Therefore, many NonDurableCursorImpl will be generated.

@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

@graysonzeng related to #13939 ?

@graysonzeng
Copy link
Contributor Author

pulsar version is 3.1.1。It looks like related to #13939 . It looks like maybe removeWaitingCursor is not properly removing the cursor after deactivateCursor() converts the cursor's isActive to false. @lhotari

deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

@graysonzeng

This comment was marked as off-topic.

@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

Another possibility is that non-durable cursors and related subscriptions should be cleaned up when a connection dies in an unexpected way. I'm not sure how that is handled in the code base currently.

@lhotari
Copy link
Member

lhotari commented Feb 29, 2024

Or maybe cleaned up after an inactivity period?

@Technoboy-
Copy link
Contributor

There is a race condition between the consumer.close and checkForNewEntries.
When the consumer is closed, the waitingCursor is empty.

public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor) throws BrokerServiceException {
cursor.updateLastActive();
if (dispatcher != null) {
dispatcher.removeConsumer(consumer);
}
// preserve accumulative stats form removed consumer
ConsumerStatsImpl stats = consumer.getStats();
bytesOutFromRemovedConsumers.add(stats.bytesOutCounter);
msgOutFromRemovedConsumer.add(stats.msgOutCounter);
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);
if (!cursor.isDurable()) {
.
Then after executing line-311, the cursor was added to waitingCursor.

Reproduced test

  1. Add sleep time for checkForNewEntries, like 10s.

    private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Object ctx) {
    try {
    if (log.isDebugEnabled()) {
    log.debug("[{}] [{}] Re-trying the read at position {}", ledger.getName(), name, op.readPosition);
    }
    if (!hasMoreEntries()) {

    image

  2. Then the below test will reproduce.

@Test
    public void testWaitingCursors() throws Exception {
        final String ns = "prop/ns-test";
        admin.namespaces().createNamespace(ns, 2);
        final String topicName = "persistent://prop/ns-test/testWaitingCursors";
        admin.topics().createNonPartitionedTopic(topicName);
        final Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
                .subscriptionMode(SubscriptionMode.NonDurable)
                .subscriptionType(SubscriptionType.Exclusive)
                .subscriptionName("sub-2").subscribe();
        final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
        producer.send("test");
        producer.close();
        final String broker = admin.lookups().lookupTopic(topicName);
        final Optional<Topic> topic = pulsar.getBrokerService().getTopic(topicName, false).join();
        assertNotNull(topic.get());
        PersistentTopic persistentTopic = (PersistentTopic) topic.get();
        final PersistentSubscription subscription = persistentTopic.getSubscription("sub-2");
        NonDurableCursorImpl cursor = (NonDurableCursorImpl) subscription.getCursor();
        final Message<String> receive = consumer.receive();
        assertEquals("test", receive.getValue());
        consumer.close();
        while (true) {
            ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
            log.info("waitingCursorsCount : {}", ledger.getWaitingCursorsCount());
            Thread.sleep(5 * 1000);
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants