-
Notifications
You must be signed in to change notification settings - Fork 295
fix: Batch RuntimeSubscriber updates for all nodes #4932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -32,7 +32,7 @@ impl ProgressBarColor { | |||
Self::Blue => "blue", | |||
Self::Magenta => "magenta", | |||
Self::Cyan => "cyan", | |||
Self::Red => "red", | |||
Self::Yellow => "yellow", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yellow looks better in the terminal than red :)
@@ -134,6 +135,8 @@ impl RuntimeStatsManager { | |||
let event_loop = async move { | |||
let mut interval = interval(throttle_interval); | |||
let mut active_nodes = HashSet::with_capacity(node_stats.len()); | |||
// Reuse container for ticks | |||
let mut snapshot_container = Vec::with_capacity(node_stats.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe premature, but easy change.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4932 +/- ##
=======================================
Coverage ? 77.55%
=======================================
Files ? 906
Lines ? 130215
Branches ? 0
=======================================
Hits ? 100993
Misses ? 29222
Partials ? 0
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Summary
This PR refactors the runtime statistics system in Daft's local execution engine to batch subscriber updates instead of sending individual updates per active node. The core change modifies the RuntimeStatsSubscriber
trait interface from handle_event(&self, event: &StatSnapshotSend, node_info: &NodeInfo)
to handle_event(&self, events: &[(&NodeInfo, StatSnapshotSend)])
, requiring all subscriber implementations to be updated.
The key architectural improvement occurs in RuntimeStatsManager
where the periodic interval tick now collects all active node statistics into a reusable snapshot_container
vector and sends them as a single batch to each subscriber. This eliminates the previous approach of making N separate subscriber calls (where N is the number of active nodes) in favor of a single batched call.
All subscriber implementations have been updated to handle the new batched interface:
- DashboardSubscriber: Iterates through batched events and sends each through its internal channel
- ProgressBarSubscriber: Updates multiple progress bars in a single method call and changes streaming sink node color from Red to Yellow
- OpenTelemetrySubscriber: Processes multiple events to record metrics with proper attributes
- DebugSubscriber: Prints all events in the batch for debugging purposes
The PR also includes important cleanup improvements: the finish()
method now properly accepts a Runtime
parameter to block on task completion during shutdown, ensuring all pending statistics are flushed. Additionally, a bug fix was added to ensure stats_manager.finalize_node()
is called when source node send operations fail, preventing memory leaks from incomplete node finalization.
A minor dependency update bumps indicatif
from version 0.17 to 0.18, likely to support enhanced progress bar functionality required by the batching changes.
Confidence score: 4/5
- This PR is safe to merge with low risk as it primarily refactors internal interfaces without changing core functionality
- Score reflects well-structured changes with consistent updates across all subscriber implementations, though the breaking API change requires careful coordination
- Pay close attention to the RuntimeStatsManager finish method changes and subscriber interface updates to ensure proper shutdown behavior
9 files reviewed, no comments
Changes Made
Rather than sending updates per active node to runtime subscribers, we should just send them all at once and let the subscriber decide what to do (whether to iterate or not). This is useful for the Flotilla subscriber because it allows us to send one packet rather than internal batching.
Also includes changes from the logging PR since they should be fixed ASAP and can help reduce that PR to narrow the segfault.
Checklist
docs/mkdocs.yml
navigation