Skip to content

Conversation

@santigimeno
Copy link
Member

@santigimeno santigimeno commented Jun 2, 2025

Introduce AsyncTSQueueOptions for batching: notification is sent when the queue size reaches min_size or after max_time ms, whichever comes first.
Implements timer-based batching using nsuv::ns_timer, with all timer and notification logic simplified for clarity and efficiency.

Summary by CodeRabbit

  • New Features
    • Added batching support for asynchronous queue notifications, enabling triggers based on configurable minimum queue size or maximum delay.
    • Introduced comprehensive tests covering batching triggers, timer behavior, and thread safety.
  • Improvements
    • Maintained legacy immediate notification behavior alongside new batching options.
    • Enhanced test reliability by running the event loop in a dedicated thread with synchronized callbacks.

@santigimeno santigimeno self-assigned this Jun 2, 2025
@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

Important

Review skipped

Auto reviews are disabled on base/target branches other than the default branch.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

This update adds batching support to the AsyncTSQueue class by introducing configurable minimum size and maximum time thresholds for triggering asynchronous notifications. It includes new batching options, timer management, conditional notification logic, and preserves legacy single-item notification behavior. The test suite was enhanced to run the event loop in a dedicated thread with condition variable synchronization and includes new tests covering batching and thread safety.

Changes

File(s) Change Summary
src/nsolid/async_ts_queue.h Added AsyncTSQueueOptions struct; new batching-enabled constructors and factory methods; timer management with nsuv::ns_timer; conditional async notification logic; batching state members; modified destructor to handle timer cleanup.
test/cctest/test_nsolid_async_ts_queue.cc Replaced default loop with dedicated uv_loop_t and thread running it; added uv_async_t for loop shutdown; synchronized tests using mutex and condition variable; added tests for batching behavior and thread safety; disabled some legacy tests by commenting out.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Producer Thread
    participant Queue as AsyncTSQueue
    participant Timer as Timer (optional)
    participant Consumer as Callback

    Producer->>Queue: enqueue(item)
    alt Batching enabled
        alt Queue size >= min_size
            Queue->>Queue: trigger_async()
            Queue->>Consumer: Invoke callback with batch
        else Queue size < min_size
            Queue->>Timer: start_timer()
            Timer-->>Queue: After max_time, trigger_async()
            Queue->>Consumer: Invoke callback with batch
        end
    else Batching disabled
        alt Queue size == 1
            Queue->>Queue: trigger_async()
            Queue->>Consumer: Invoke callback with item
        end
    end
Loading

Poem

🐇 In queues where tasks align,
Batching waits for size or time.
Timers tick, then calls arise,
Or immediate if size complies.
Tests now run with threads in sync,
Rabbits hop without a blink!
⏰📦✨


🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@santigimeno
Copy link
Member Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@windsurf-bot
Copy link

windsurf-bot bot commented Jun 2, 2025

I ran into an unexpected issue while reviewing this PR. Please try again later.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🔭 Outside diff range comments (1)
src/nsolid/async_ts_queue.h (1)

71-73: ⚠️ Potential issue

Add timer cleanup in destructor to prevent memory leak.

The timer is allocated with new but never deleted, causing a memory leak.

Apply this diff to properly clean up the timer:

 ~AsyncTSQueue() {
   async_handle_->close_and_delete();
+  if (timer_ != nullptr) {
+    timer_->close_and_delete();
+  }
 }
🧹 Nitpick comments (2)
src/nsolid/async_ts_queue.h (2)

250-254: Initialize batching_enabled_ at declaration.

The member variable batching_enabled_ has a default initializer but is always overwritten in constructors. For consistency and clarity, remove the redundant initialization.

 // Batching options and timer
 AsyncTSQueueOptions opts_;
-bool batching_enabled_ = false;
+bool batching_enabled_;
 nsuv::ns_timer* timer_;

18-24: Consider adding validation and documentation for AsyncTSQueueOptions.

The struct would benefit from validation constraints and more detailed documentation about the behavior when values are 0.

 /**
  * Options for AsyncTSQueue batching notification
+ * 
+ * @field min_size Minimum queue size to trigger notification (0 = disabled)
+ * @field max_time Maximum time in milliseconds before notification (0 = disabled)
+ * 
+ * When both are specified, notification is triggered when either condition is met.
  */
 struct AsyncTSQueueOptions {
   uint64_t min_size = 0;  // Minimum queue size to trigger notification
   uint64_t max_time = 0;  // Maximum time (ms) before notification
 };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and ef5ca34.

📒 Files selected for processing (1)
  • src/nsolid/async_ts_queue.h (6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: coverage-windows
  • GitHub Check: lint-js-and-md
  • GitHub Check: test-macOS
  • GitHub Check: build-tarball
  • GitHub Check: coverage-linux

@santigimeno
Copy link
Member Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
src/nsolid/async_ts_queue.h (2)

160-163: 🛠️ Refactor suggestion

batching_enabled_ should allow size-only OR time-only modes

The current check requires both min_size and max_time to be positive.
This prevents perfectly valid use-cases such as “flush after 100 items no matter
how long it takes” or “flush every 10 ms even for a single item”.

-    batching_enabled_ = (opts_.min_size > 0 && opts_.max_time > 0);
+    batching_enabled_ = (opts_.min_size > 0 || opts_.max_time > 0);

After changing this line remember to adapt the enqueue logic so that the timer
is only armed when opts_.max_time > 0.


94-102: 🛠️ Refactor suggestion

Batch trigger at size == 1 defeats the purpose of batching

With batching enabled the queue is drained immediately when the very first
element arrives (size == 1).
This provides no opportunity for additional items to accumulate and essentially
turns “batching” into “single-item + timer ping-pong”.

A more typical batching strategy is:

  • If size >= min_size → trigger immediately.
  • else if this is the first item → start the timer only.
-    if (batching_enabled_) {
-      if (size == 1 || size >= opts_.min_size) {
-        trigger_async();
-      }
-    } else {
+    if (batching_enabled_) {
+      if (opts_.min_size > 0 && size >= opts_.min_size) {
+        trigger_async();              // enough items – flush now
+      } else if (opts_.max_time > 0 && size == 1) {
+        start_timer();                // first item – arm the timer
+      }
+    } else {
       if (size == 1) {
         ASSERT_EQ(0, async_handle_->send());
       }
     }
🧹 Nitpick comments (1)
src/nsolid/async_ts_queue.h (1)

119-122: trigger_async() should cancel any pending timer

Once a batch is flushed we no longer need the outstanding timer. Cancel it to
avoid unnecessary wake-ups:

   void trigger_async() {
     ASSERT_EQ(0, async_handle_->send());
+    if (timer_ != nullptr && timer_->is_active())
+      ASSERT_EQ(0, timer_->stop());
   }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and dfe4179.

📒 Files selected for processing (1)
  • src/nsolid/async_ts_queue.h (7 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/nsolid/async_ts_queue.h (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.441Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test-tarball-linux
  • GitHub Check: coverage-linux
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: coverage-windows
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: test-macOS

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/nsolid/async_ts_queue.h (1)

162-174: batching_enabled_ should be ||, not &&

Previous reviews already pointed out that requiring both min_size and
max_time to be greater than zero disables pure size-based or pure time-based
batching modes. Unless this is intentional, change to:

batching_enabled_ = (opts_.min_size > 0 || opts_.max_time > 0);
🧹 Nitpick comments (1)
src/nsolid/async_ts_queue.h (1)

20-26: Include <cstdint> to guarantee uint64_t is defined

uint64_t is declared in <cstdint>/<stdint.h>. Relying on a transitive
include can silently break builds on some tool-chains. Add the header next to
the existing standard includes.

 #include <memory>
 #include <vector>
+#include <cstdint>   // uint64_t
 #include <functional>
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and 250ba6b.

📒 Files selected for processing (1)
  • src/nsolid/async_ts_queue.h (8 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/nsolid/async_ts_queue.h (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.441Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test-tarball-linux
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: coverage-linux
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: coverage-windows
  • GitHub Check: test-macOS

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/nsolid/async_ts_queue.h (1)

171-172: Enable batching when either threshold is configured, not only when both are

batching_enabled_ currently uses min_size > 0 && max_time > 0, preventing
pure size-based or pure time-based batching. Switching to logical OR allows
the three expected modes (size-only, time-only, combined).

-    batching_enabled_ = (opts_.min_size > 0 && opts_.max_time > 0);
+    batching_enabled_ = (opts_.min_size > 0 || opts_.max_time > 0);

(You’ll also need to guard start_timer() with if (opts_.max_time > 0) when
the timer is actually required.)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and e7050db.

📒 Files selected for processing (1)
  • src/nsolid/async_ts_queue.h (7 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/nsolid/async_ts_queue.h (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.441Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test-tarball-linux
  • GitHub Check: coverage-linux
  • GitHub Check: test-macOS
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: coverage-windows
🔇 Additional comments (1)
src/nsolid/async_ts_queue.h (1)

73-78: Destructor now cleans up the timer – nice catch!

timer_->close_and_delete() prevents the leak reported earlier and mirrors the
async handle cleanup.

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 2, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/nsolid/async_ts_queue.h (1)

162-176: batching_enabled_ only when both thresholds are set – breaks “size-only” / “time-only” modes

Earlier review round already flagged this, and the logic is unchanged. Requiring both min_size and max_time to be non-zero prevents users from opting into purely size-based or purely time-based batching, which the public docs claim is supported (“whichever occurs first”).

-    batching_enabled_ = (opts_.min_size > 0 && opts_.max_time > 0);
+    batching_enabled_ = (opts_.min_size > 0 || opts_.max_time > 0);

Follow-up changes needed in enqueue_impl():

-      if (size == 1) {
+      if (opts_.max_time > 0 && size == 1) {
         ...
-      } else if (size >= opts_.min_size) {
+      } else if (opts_.min_size > 0 && size >= opts_.min_size) {

(This is exactly the suggestion made in the previous review; marking as duplicate.)

🧹 Nitpick comments (6)
src/nsolid/async_ts_queue.h (6)

10-16: Include <cstdint> for the fixed-width integer types you introduce

AsyncTSQueueOptions uses uint64_t, but <cstdint> (or <stdint.h>) is not included in this header.
While most toolchains incidentally bring it in through other headers, relying on that is brittle and can break when the transitive include hierarchy changes.

 #include <memory>
 #include <vector>
+#include <cstdint>   // uint64_t
 #include <functional>
 #include <tuple>
 #include <type_traits>
 #include <atomic>

20-27: Clarify default-constructed options & validate at runtime

  1. A default-constructed AsyncTSQueueOptions{} currently disables batching (min_size == 0 && max_time == 0). That is fine, but worth documenting explicitly so users understand that both values must be set to enable batching under the current implementation.

  2. Consider adding a cheap runtime ASSERT (or at least a comment) to flag obviously invalid combinations, e.g. min_size == 0 && max_time == 0, if the intent is to require either field to be non-zero (see next comment).


98-111: Edge-case: active timer not cancelled when min_size flushes immediately

When the queue reaches min_size, you clear timer_armed_ and trigger an async flush, but any already-running timer is left active.
That wastes a wake-up and can lead to surprising extra callbacks.

A minimal defensive fix:

       } else if (size >= opts_.min_size) {
         // Make sure we don't arm the timer if min size is reached, so we're
         // items are consumed in the async callback.
         timer_armed_.store(false, std::memory_order_release);
+        if (timer_ != nullptr && timer_->is_active()) {
+          /* safe – nsuv timers are single-shot */
+          ASSERT_EQ(0, timer_->stop());
+        }
         trigger_async();
       }

49-60: Use std::make_shared to avoid two separate allocations

Both factory functions create() use new + shared_ptr, leading to two allocations (control block + object). std::make_shared combines them and gives a small perf win:

-    SharedAsyncTSQueue queue(new AsyncTSQueue<T>(
-        loop, opts, std::forward<Cb>(cb), std::forward<Args>(args)...));
+    auto queue = std::make_shared<AsyncTSQueue<T>>(
+        loop, opts, std::forward<Cb>(cb), std::forward<Args>(args)...);

The same applies to the legacy factory version.
(If you rely on the incomplete type inside the constructor this is still safe because construction happens after allocation.)


74-78: Set timer_ to nullptr after closing to avoid accidental reuse

After close_and_delete() the pointer is dangling. Clearing it prevents accidental dereference in future maintenance:

   if (timer_) {
     timer_->close_and_delete();
+    timer_ = nullptr;
   }

270-275: bool batching_enabled_ should be const

After construction the flag never changes.
Marking it const communicates intent and enables compiler optimisations:

-  bool batching_enabled_ = false;
+  const bool batching_enabled_ = false;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and 7b83284.

📒 Files selected for processing (1)
  • src/nsolid/async_ts_queue.h (7 hunks)
🧰 Additional context used
🧠 Learnings (1)
src/nsolid/async_ts_queue.h (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.441Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test-tarball-linux
  • GitHub Check: test-macOS
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: coverage-linux
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: coverage-windows
🔇 Additional comments (1)
src/nsolid/async_ts_queue.h (1)

247-255: Possible spin when queue drains between async & timer start

In async_callback() you return after start_timer().
If the queue is emptied before the timer fires (e.g., producer thread flushes and no new items come), the timer still fires, sends another async, which then finds the queue empty and exits.
Not harmful, but a wasted wake-up.

A cheap optimisation: stop the timer if the queue becomes empty immediately after start_timer() or early-exit in the timer callback when queue_->empty().

Would you like a concrete diff demonstrating this micro-optimisation?

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 3, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/nsolid/async_ts_queue.h (2)

100-115: Race-free but still spurious timer wake-ups when min_size is reached

When size >= opts_.min_size, the timer may already be running.
The code clears timer_armed_ but does not cancel the active timer, so an
extra callback fires with an empty queue, wasting a wake-up (see the discussion
in prior reviews).

If you want to avoid the redundant wake-up while keeping the implementation
simple and thread-safe, you can stop the timer only when you know you are on
the loop thread (inside async_callback):

else if (size >= opts_.min_size) {
   timer_armed_.store(false, std::memory_order_release);
   trigger_async();
+  // The async callback runs on the loop thread – stop the timer there
}

and inside async_callback just after process():

if (batching_enabled_ && timer_->is_active())
  ASSERT_EQ(0, timer_->stop());

This is an optional optimisation; correctness is not affected.


170-180: Batching requires BOTH min_size and max_time – limits flexibility

batching_enabled_ is set only when both thresholds are positive. This
prevents users from selecting only size-based or only time-based
batching, which is a common use-case.

Switching the condition to opts.min_size > 0 || opts.max_time > 0 and
guarding each path accordingly would make the API more flexible without
breaking existing behaviour.

🧹 Nitpick comments (2)
src/nsolid/async_ts_queue.h (1)

240-247: async_callback() skips process() when queue drained by timer

The early return after start_timer() is correct for the first enqueue,
but after the timer fires and the queue is empty the same early exit will not
be taken (because timer_armed_ is false). This ensures process() is
invoked exactly once per flush – good!

However, if new items are enqueued during process(), the timer is not
re-armed until the producer pushes another item (size==1 path). That is fine
for most workloads but worth documenting as it differs from level-triggered
queues.

test/cctest/test_nsolid_async_ts_queue.cc (1)

597-603: Waiting predicate always false after processed.clear()

After clearing processed, the predicate processed.size() == 1 can never
become true, so the wait_for simply times out. Although the test still
passes, the intent (detect spurious callbacks) is obscured.

Consider waiting on call_count instead, or use processed.empty() to make
the intention explicit:

cv.wait_for(lk, 500ms, [&] { return call_count > 0; });
...
EXPECT_EQ(call_count, 0);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and 4060e62.

📒 Files selected for processing (2)
  • src/nsolid/async_ts_queue.h (7 hunks)
  • test/cctest/test_nsolid_async_ts_queue.cc (14 hunks)
🧰 Additional context used
🧠 Learnings (2)
src/nsolid/async_ts_queue.h (2)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:119-130
Timestamp: 2025-06-02T23:07:18.018Z
Learning: In AsyncTSQueue, batching_enabled_ is only true when both opts_.min_size > 0 AND opts_.max_time > 0. The start_timer() method is only called when batching_enabled_ is true, which ensures opts_.max_time > 0 is always satisfied when the timer is started.
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.461Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
test/cctest/test_nsolid_async_ts_queue.cc (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:119-130
Timestamp: 2025-06-02T23:07:18.018Z
Learning: In AsyncTSQueue, batching_enabled_ is only true when both opts_.min_size > 0 AND opts_.max_time > 0. The start_timer() method is only called when batching_enabled_ is true, which ensures opts_.max_time > 0 is always satisfied when the timer is started.
🧬 Code Graph Analysis (1)
src/nsolid/async_ts_queue.h (1)
agents/zmq/src/zmq_agent.h (1)
  • loop_ (432-432)
⏰ Context from checks skipped due to timeout of 90000ms (8)
  • GitHub Check: test-tarball-linux
  • GitHub Check: lint-js-and-md
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: coverage-windows
  • GitHub Check: coverage-linux
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: test-macOS
🔇 Additional comments (2)
src/nsolid/async_ts_queue.h (2)

124-134: Guard start_timer() against zero or re-entrancy is no longer needed

Nice job simplifying the logic – start_timer() is now called only from the
loop thread and the timer is one-shot, so the previous re-entrancy concerns are
gone. The lambda correctly captures a weak pointer to avoid use-after-free
issues. Looks good!


75-82:

❓ Verification inconclusive

Potential cross-thread uv_close call in the destructor

async_handle_->close_and_delete() and timer_->close_and_delete() must run on
the same thread that owns the underlying uv_loop_t.
Because the queue is typically destroyed on the producer thread (tests do this
in the main thread while the loop lives on another thread), these calls may
violate libuv’s thread-affinity rules and lead to use-after-free or silent
memory leaks.

Consider one of the following:

~AsyncTSQueue() {
-  async_handle_->close_and_delete();
-  ...
+  // Dispatch close to the loop thread.
+  if (uv_loop_alive(loop_) != 0) {
+    uv_async_t* closer = new uv_async_t;
+    ASSERT_EQ(0, uv_async_init(loop_, closer,
+      +[](uv_async_t* h) {
+        auto* q = static_cast<AsyncTSQueue*>(h->data);
+        q->async_handle_->close_and_delete();
+        if (q->timer_ != nullptr)
+          q->timer_->close_and_delete();
+        uv_close(reinterpret_cast<uv_handle_t*>(h), [](uv_handle_t* h2){ delete h2; });
+      }));
+    closer->data = this;
+    uv_async_send(closer);
+  }
 }

At minimum, document that the destructor must be invoked on the loop thread
or provide a dedicated close() method to be called from there.


#!/bin/bash
# Locate and print the AsyncTSQueue class definition and its destructor context
rg --context 5 "class AsyncTSQueue" -n .

#!/bin/bash
# Print the AsyncTSQueue class definition including its destructor
rg --context 10 "class AsyncTSQueue" -n src/nsolid/async_ts_queue.h

# Locate every definition and usage of close_and_delete in the codebase
rg --context 5 "close_and_delete" -n src

Enforce libuv thread-affinity in AsyncTSQueue destructor

The destructor currently calls async_handle_->close_and_delete() and timer_->close_and_delete() directly. These must run on the thread that owns the uv_loop_t, otherwise you risk use-after-free or silent leaks if ~AsyncTSQueue() is invoked on the producer/main thread.

Please ensure one of the following:

  • Document that ~AsyncTSQueue() must be called on the loop thread.
  • Provide an explicit close() method that’s called on the loop thread.
  • Dispatch the close calls to the loop thread via uv_async_t:
~AsyncTSQueue() {
-  async_handle_->close_and_delete();
-  if (timer_) {
-    timer_->close_and_delete();
-  }
+  // Dispatch handle closures on the loop thread
+  if (uv_loop_alive(loop_)) {
+    auto* closer = new uv_async_t;
+    uv_async_init(loop_, closer,
+      [](uv_async_t* req) {
+        auto* self = static_cast<AsyncTSQueue*>(req->data);
+        self->async_handle_->close_and_delete();
+        if (self->timer_) self->timer_->close_and_delete();
+        uv_close(reinterpret_cast<uv_handle_t*>(req),
+                 [](uv_handle_t* h){ delete h; });
+      });
+    closer->data = this;
+    uv_async_send(closer);
+  }

@santigimeno
Copy link
Member Author

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Jun 3, 2025

✅ Actions performed

Full review triggered.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
src/nsolid/async_ts_queue.h (4)

75-82: LGTM! Timer cleanup properly implemented.

The destructor correctly handles timer cleanup by calling close_and_delete() when the timer exists, preventing the memory leak issue raised in previous reviews. Setting pointers to nullptr after deletion is a good defensive practice.


103-119: ⚠️ Potential issue

Critical: Min-size logic doesn't trigger immediate flush

When size >= opts_.min_size, the queue should be flushed immediately. However, the current implementation only prevents the timer from being armed - it doesn't stop an already-running timer or ensure immediate processing.

The issue: If a timer was started when size was 1, and then size reaches min_size before the timer expires, the queue won't be flushed until the timer fires.

Apply this fix to ensure immediate flush when min_size is reached:

      } else if (size >= opts_.min_size) {
        // Make sure we don't arm the timer if min size is reached, so we're
        // items are consumed in the async callback.
        timer_armed_.store(false, std::memory_order_release);
+       // Stop any active timer to prevent delayed processing
+       if (timer_ != nullptr && timer_->is_active()) {
+         ASSERT_EQ(0, timer_->stop());
+       }
        trigger_async();
      }

124-135: ⚠️ Potential issue

Potential issue: Timer callback unconditionally triggers async

The timer callback calls trigger_async() without checking if the queue is empty. Combined with the async callback's timer restart logic, this could lead to unnecessary timer cycles when the queue is empty.

Consider adding a queue empty check in the timer callback:

    ASSERT_EQ(0, timer_->start(+[](nsuv::ns_timer*, WeakAsyncTSQueue queue_wp) {
      SharedAsyncTSQueue queue = queue_wp.lock();
      if (queue == nullptr) {
        return;
      }

-     queue->trigger_async();
+     // Only trigger if there are items to process
+     if (!queue->queue_.empty()) {
+       queue->trigger_async();
+     }
    }, opts_.max_time, 0, this->weak_from_this()));

247-255: ⚠️ Potential issue

Timer restart logic prevents immediate processing

The async callback only starts the timer when timer_armed_ is true, but doesn't call process() in that case. This prevents immediate processing when min_size is reached, even though timer_armed_ was set to false in enqueue_impl.

The logic should process immediately when timer_armed_ is false (indicating a deliberate flush).

test/cctest/test_nsolid_async_ts_queue.cc (1)

28-43: LGTM! Proper event loop lifecycle management

The test fixture correctly:

  1. Initializes the UV loop
  2. Runs it in a dedicated thread
  3. Uses async handle for clean shutdown
  4. Properly closes the loop in TearDown (addressing the previous review comment)
🧹 Nitpick comments (2)
src/nsolid/async_ts_queue.h (1)

234-240: Important design limitation documented

The comment correctly documents that new items enqueued during process() won't automatically re-arm the timer, potentially causing delays. This edge-triggered behavior differs from level-triggered designs.

Consider documenting this behavior in the public API documentation or class comments as well, since it affects how users should design their callback handlers. If callbacks enqueue new items, users might need to manually trigger another notification.

test/cctest/test_nsolid_async_ts_queue.cc (1)

9-19: Fix lint warnings for C++11 headers

The NOLINTNEXTLINE comments suggest these headers are flagged by the linter. Consider using the project's standard approach for including C++11 headers or updating the lint configuration.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3dc12d9 and 6870fa5.

📒 Files selected for processing (2)
  • src/nsolid/async_ts_queue.h (7 hunks)
  • test/cctest/test_nsolid_async_ts_queue.cc (14 hunks)
🧰 Additional context used
🧠 Learnings (2)
src/nsolid/async_ts_queue.h (2)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:119-130
Timestamp: 2025-06-02T23:07:18.018Z
Learning: In AsyncTSQueue, batching_enabled_ is only true when both opts_.min_size > 0 AND opts_.max_time > 0. The start_timer() method is only called when batching_enabled_ is true, which ensures opts_.max_time > 0 is always satisfied when the timer is started.
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:97-107
Timestamp: 2025-06-02T14:51:54.461Z
Learning: In nsuv timer implementation, timer_->start() is idempotent - it automatically stops the timer if needed before rearming it, eliminating the need for manual timer stopping.
test/cctest/test_nsolid_async_ts_queue.cc (1)
Learnt from: santigimeno
PR: nodesource/nsolid#319
File: src/nsolid/async_ts_queue.h:119-130
Timestamp: 2025-06-02T23:07:18.018Z
Learning: In AsyncTSQueue, batching_enabled_ is only true when both opts_.min_size > 0 AND opts_.max_time > 0. The start_timer() method is only called when batching_enabled_ is true, which ensures opts_.max_time > 0 is always satisfied when the timer is started.
⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: test-tarball-linux
  • GitHub Check: test-macOS
  • GitHub Check: coverage-linux
  • GitHub Check: coverage-linux-without-intl
  • GitHub Check: test-linux (ubuntu-24.04)
  • GitHub Check: test-linux (ubuntu-24.04-arm)
  • GitHub Check: coverage-windows
🔇 Additional comments (12)
src/nsolid/async_ts_queue.h (7)

10-14: LGTM!

The added includes are necessary for the new batching functionality - atomic for thread-safe timer state, cstdint for fixed-width integers in the options struct, and standard containers for the implementation.


21-28: LGTM!

The AsyncTSQueueOptions struct is well-designed with sensible defaults (0 = disabled) and clear documentation. The naming clearly indicates the purpose of each field.


50-61: LGTM!

The new factory method properly forwards the options and maintains the same perfect forwarding pattern as the legacy method. Good separation of concerns between options-based and legacy behavior.


62-71: LGTM!

The legacy factory method correctly creates default options to maintain backward compatibility. This ensures existing code continues to work without modifications.


170-181: LGTM!

The constructor properly initializes all members including the new batching-related fields. The batching_enabled_ logic correctly requires both conditions per the documented behavior.


227-231: LGTM!

Timer initialization is properly guarded by the batching_enabled_ flag and correctly initialized with the event loop.


271-275: LGTM!

Member variables are properly declared with appropriate types and initialization. The atomic bool for timer_armed_ ensures thread-safe access.

test/cctest/test_nsolid_async_ts_queue.cc (5)

461-496: LGTM! Comprehensive test for size-based batching

The test properly verifies:

  • Items below min_size don't trigger processing
  • Reaching min_size triggers immediate batch processing
  • All items are processed in a single batch

Good use of timeouts to ensure non-triggering behavior.


498-522: LGTM! Validates time-based batching

The test correctly verifies that items are processed after max_time expires even when min_size isn't reached. The 200ms wait is appropriate for a 50ms timer.


582-617: Good test for timer lifecycle

This test validates that the timer doesn't continue firing after the queue is empty, which was a concern raised in previous reviews. The test structure properly checks for spurious wakeups.


620-645: LGTM! Thread safety validation

The test properly validates concurrent enqueuing from multiple threads and ensures all items are processed. Good use of sorting to verify no items were lost.


524-579: ⚠️ Potential issue

Test reveals batching design issue

The test expects 3 separate batches when items are enqueued slowly (batch_count == 3), but this contradicts the typical batching behavior where items should accumulate until min_size or max_time is reached.

The current implementation appears to process each item individually when enqueued slowly, which defeats the purpose of time-based batching.

The test seems to confirm the issue mentioned in previous reviews - that items are processed immediately rather than being batched by time. Consider whether this test reflects the intended behavior or if the implementation needs fixing.

Likely an incorrect or invalid review comment.

@santigimeno santigimeno force-pushed the santi/improve_async_ts_queue_3 branch from 6870fa5 to 1793381 Compare June 3, 2025 11:36
Copy link
Member

@RafaelGSS RafaelGSS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RSLGTM

@santigimeno santigimeno force-pushed the santi/improve_async_ts_queue_2 branch from 3dc12d9 to b7ddf91 Compare June 5, 2025 21:21
Introduce AsyncTSQueueOptions for batching: notification is sent when
the queue size reaches min_size or after max_time ms, whichever comes
first.
Implements timer-based batching using nsuv::ns_timer, with all timer and
notification logic simplified for clarity and efficiency.
Refactor cctests so the event loop runs on a separate. Also add new
tests covering the new functionality.
@santigimeno santigimeno changed the base branch from santi/improve_async_ts_queue_2 to node-v22.x-nsolid-v5.x June 5, 2025 21:28
@santigimeno santigimeno force-pushed the santi/improve_async_ts_queue_3 branch from 1793381 to 0456808 Compare June 5, 2025 21:47
santigimeno added a commit that referenced this pull request Jun 5, 2025
Introduce AsyncTSQueueOptions for batching: notification is sent when
the queue size reaches min_size or after max_time ms, whichever comes
first.
Implements timer-based batching using nsuv::ns_timer, with all timer and
notification logic simplified for clarity and efficiency.
Refactor cctests so the event loop runs on a separate. Also add new
tests covering the new functionality.

PR-URL: #319
Reviewed-By: Rafael Gonzaga <[email protected]>
@santigimeno
Copy link
Member Author

Landed in 308046b

@santigimeno santigimeno closed this Jun 5, 2025
@santigimeno santigimeno deleted the santi/improve_async_ts_queue_3 branch June 5, 2025 21:48
santigimeno added a commit that referenced this pull request Aug 25, 2025
Introduce AsyncTSQueueOptions for batching: notification is sent when
the queue size reaches min_size or after max_time ms, whichever comes
first.
Implements timer-based batching using nsuv::ns_timer, with all timer and
notification logic simplified for clarity and efficiency.
Refactor cctests so the event loop runs on a separate. Also add new
tests covering the new functionality.

PR-URL: #319
Reviewed-By: Rafael Gonzaga <[email protected]>
santigimeno added a commit that referenced this pull request Aug 26, 2025
Introduce AsyncTSQueueOptions for batching: notification is sent when
the queue size reaches min_size or after max_time ms, whichever comes
first.
Implements timer-based batching using nsuv::ns_timer, with all timer and
notification logic simplified for clarity and efficiency.
Refactor cctests so the event loop runs on a separate. Also add new
tests covering the new functionality.

PR-URL: #319
Reviewed-By: Rafael Gonzaga <[email protected]>
PR-URL: #359
Reviewed-By: Juan José Arboleda <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants