Skip to content

Conversation

@pwhelan
Copy link
Contributor

@pwhelan pwhelan commented Sep 5, 2025

Summary

Add pause/resume callbacks for in_event_type that stops the interval collector.

Description

When stopping fluent-bit it firsts pauses all input plugins. This is usually meant to stop plugins from creating more tasks. For this to work correctly plugins need to pause their timed collectors at this point.

The plugin in_event_type does not have these callbacks. This leads to situations occasionally where a task is created right as fluent-bit is shutting down leading to a memory leak:

[2025/09/05 14:17:48] [ info] [input:event_type:event_type.0] [OK] collector_time
[2025/09/05 14:17:48] [ info] [engine] pending chunk count: memory=1, filesystem=0; grace_timer=2
[2025/09/05 14:17:48] [ info] [task] event_type/event_type.0 has 1 pending task(s):
[2025/09/05 14:17:48] [ info] [task]   task_id=0 still running on route(s): lib/lib.0 
[2025/09/05 14:17:48] [ info] [engine] service has stopped (1 pending tasks)

=================================================================
==22454==ERROR: LeakSanitizer: detected memory leaks

Indirect leak of 25088 byte(s) in 1 object(s) allocated from:
    #0 0x7f4f752fd9c7 in malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:69
    #1 0x55ce6353df63  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x8e3f63) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f4f752fd340 in calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:77
    #1 0x55ce62e12fec  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1b8fec) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #2 0x55ce62e430d4  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e90d4) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #3 0x55ce62e3e631  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e4631) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #4 0x55ce62e405c6  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e65c6) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #5 0x55ce62df81d2  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x19e1d2) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #6 0x7f4f7525ea41 in asan_thread_start ../../../../src/libsanitizer/asan/asan_interceptors.cpp:234
    #7 0x7f4f7469caa3  (/lib/x86_64-linux-gnu/libc.so.6+0x9caa3) (BuildId: 282c2c16e7b6600b0b22ea0c99010d2795752b5f)

Indirect leak of 24 byte(s) in 1 object(s) allocated from:
    #0 0x7f4f752fd340 in calloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cpp:77
    #1 0x55ce62e13123  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1b9123) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #2 0x55ce62e430d4  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e90d4) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #3 0x55ce62e3e631  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e4631) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #4 0x55ce62e405c6  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x1e65c6) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #5 0x55ce62df81d2  (/home/runner/work/calyptia-core-fluent-bit/calyptia-core-fluent-bit/generated/source/build/bin/flb-rt-processor_labels+0x19e1d2) (BuildId: f1ea6883d5a1cbb8870f6e3a3f1a76236adbc214)
    #6 0x7f4f7525ea41 in asan_thread_start ../../../../src/libsanitizer/asan/asan_interceptors.cpp:234
    #7 0x7f4f7469caa3  (/lib/x86_64-linux-gnu/libc.so.6+0x9caa3) (BuildId: 282c2c16e7b6600b0b22ea0c99010d2795752b5f)

SUMMARY: AddressSanitizer: 25184 byte(s) leaked in 3 allocation(s).

Here is a run of the flb-rt-processor_labels hash_label test, which is where I originally encountered the bug:

valgrind --leak-check=full ./bin/flb-rt-processor_labels hash_label
==118880== Memcheck, a memory error detector
==118880== Copyright (C) 2002-2024, and GNU GPL'd, by Julian Seward et al.
==118880== Using Valgrind-3.25.1 and LibVEX; rerun with -h for copyright info
==118880== Command: ./bin/flb-rt-processor_labels hash_label
==118880==
Test hash_label...                              [2025/09/05 13:26:30] [ info] [fluent bit] version=4.0.4, commit=ce77e001e7, pid=118880
[2025/09/05 13:26:30] [ info] [storage] ver=1.5.3, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2025/09/05 13:26:30] [ info] [simd    ] disabled
[2025/09/05 13:26:30] [ info] [cmetrics] version=1.0.3
[2025/09/05 13:26:30] [ info] [ctraces ] version=0.6.6
[2025/09/05 13:26:30] [ info] [input:event_type:event_type.0] initializing
[2025/09/05 13:26:30] [ info] [input:event_type:event_type.0] storage_strategy='memory' (memory only)
[2025/09/05 13:26:30] [ info] [input:event_type:event_type.0] thread instance initialized
[2025/09/05 13:26:30] [ info] [sp] stream processor started
[2025/09/05 13:26:30] [ info] [engine] Shutdown Grace Period=2, Shutdown Input Grace Period=1
[2025/09/05 13:26:31] [ info] [input:event_type:event_type.0] [OK] collector_time
[2025/09/05 13:26:31] [ warn] [engine] service will shutdown in max 2 seconds
[2025/09/05 13:26:31] [ info] [engine] service has stopped (0 pending tasks)
[ OK ]
SUCCESS: All unit tests have passed.
==118880==
==118880== HEAP SUMMARY:
==118880==     in use at exit: 0 bytes in 0 blocks
==118880==   total heap usage: 6,398 allocs, 6,398 frees, 995,348 bytes allocated
==118880==
==118880== All heap blocks were freed -- no leaks are possible
==118880==
==118880== For lists of detected and suppressed errors, rerun with: -s
==118880== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

Enter [N/A] in the box, if an item is not applicable to your change.

Testing
Before we can approve your change; please submit the following in a comment:

  • Example configuration file for the change
  • Debug log output from testing the change
  • Attached Valgrind output that shows no leaks or memory corruption was found

If this is a change to packaging of containers or native binaries then please confirm it works for all targets.

  • Run local packaging test showing all targets (including any new ones) build.
  • Set ok-package-test label to test for all targets (requires maintainer to do).

Documentation

  • Documentation required for this feature

Backporting

  • Backport to latest stable release.

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features
    • Added pause and resume support for the Event Type input plugin, allowing it to be cleanly suspended and resumed via standard lifecycle controls.
  • Bug Fixes
    • Ensures the plugin reliably halts data collection when paused and resumes correctly, improving stability during configuration reloads and controlled shutdowns.

Signed-off-by: Phillip Whelan <phillip.whelan@chronosphere.io>
@coderabbitai
Copy link

coderabbitai bot commented Sep 5, 2025

Walkthrough

Adds an input-instance pointer to the event_type context, stores it during init, and implements pause/resume callbacks to control the collector using the stored collector FD and input instance. Updates the plugin descriptor to register these callbacks. Other collection logic remains unchanged.

Changes

Cohort / File(s) Summary
Event Type input plugin
plugins/in_event_type/event_type.c
Add struct flb_input_instance *ins to struct event_type; set ctx->ins = ins during init; implement cb_event_type_pause/cb_event_type_resume to pause/resume collector via coll_fd and ins; register callbacks in in_event_type_plugin (cb_pause, cb_resume).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant InputEngine
  participant EventTypePlugin as EventType Plugin
  participant Collector

  rect rgb(235, 245, 255)
  note over InputEngine,EventTypePlugin: Initialization
  InputEngine->>EventTypePlugin: init(ins)
  EventTypePlugin->>EventTypePlugin: ctx->ins = ins
  EventTypePlugin->>Collector: register collector -> coll_fd
  end

  rect rgb(245, 235, 255)
  note over InputEngine,EventTypePlugin: Pause lifecycle
  InputEngine->>EventTypePlugin: cb_pause()
  EventTypePlugin->>Collector: pause(coll_fd, ctx->ins)
  Collector-->>EventTypePlugin: paused
  end

  rect rgb(235, 255, 235)
  note over InputEngine,EventTypePlugin: Resume lifecycle
  InputEngine->>EventTypePlugin: cb_resume()
  EventTypePlugin->>Collector: resume(coll_fd, ctx->ins)
  Collector-->>EventTypePlugin: resumed
  end
Loading

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

Suggested labels

backport to v4.0.x

Suggested reviewers

  • koleini
  • fujimotos

Poem

A twitch of ears, a pause so neat,
I stash the ins, keep state complete.
With gentle thump I stop the stream,
Then hop resume—revive the beam.
Coll_fd in paw, I guard the flow,
A nimble bunny, start–stop pro. 🐇✨

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pwhelan-in_event_type-pause-resume

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
plugins/in_event_type/event_type.c (3)

456-462: Log and guard pause errors.

Currently ignores failures from flb_input_collector_pause; add minimal checks and a warning for observability.

Apply:

-static void cb_event_type_pause(void *data, struct flb_config *config)
-{
-    struct event_type *ctx = data;
-
-    flb_input_collector_pause(ctx->coll_fd, ctx->ins);
-}
+static void cb_event_type_pause(void *data, struct flb_config *config)
+{
+    int ret;
+    struct event_type *ctx = data;
+
+    if (!ctx) {
+        return;
+    }
+
+    ret = flb_input_collector_pause(ctx->coll_fd, ctx->ins);
+    if (ret != 0 && ctx->ins) {
+        flb_plg_warn(ctx->ins, "failed to pause collector id=%i", ctx->coll_fd);
+    }
+}

463-469: Mirror resume-side error handling.

Same rationale as pause: check return and log failures to aid shutdown diagnostics.

Apply:

-static void cb_event_type_resume(void *data, struct flb_config *config)
-{
-    struct event_type *ctx = data;
-
-    flb_input_collector_resume(ctx->coll_fd, ctx->ins);
-}
+static void cb_event_type_resume(void *data, struct flb_config *config)
+{
+    int ret;
+    struct event_type *ctx = data;
+
+    if (!ctx) {
+        return;
+    }
+
+    ret = flb_input_collector_resume(ctx->coll_fd, ctx->ins);
+    if (ret != 0 && ctx->ins) {
+        flb_plg_warn(ctx->ins, "failed to resume collector id=%i", ctx->coll_fd);
+    }
+}

473-476: Fix minor typo in config help text.

“optionsa” → “options”.

-     "Set the type of event to deliver, optionsa are: logs, metrics or traces"
+     "Set the type of event to deliver, options are: logs, metrics or traces"
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 9bfd593 and ce77e00.

📒 Files selected for processing (1)
  • plugins/in_event_type/event_type.c (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
plugins/in_event_type/event_type.c (1)
src/flb_input.c (2)
  • flb_input_collector_pause (1951-1991)
  • flb_input_collector_resume (2014-2070)
🔇 Additional comments (3)
plugins/in_event_type/event_type.c (3)

46-46: Storing input instance in context looks good.

This enables safe pause/resume without depending on external state.


409-409: Init correctly persists the input instance.

Assignment is early enough and before collector setup; no issues.


499-501: Approve code changes: only the time collector (flb_input_set_collector_time) is used and the .cb_pause/.cb_resume callbacks are correctly wired.

@edsiper edsiper merged commit 070e761 into master Sep 5, 2025
56 checks passed
@edsiper edsiper deleted the pwhelan-in_event_type-pause-resume branch September 5, 2025 19:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants