Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@
unit="seconds",
targets=[
Target(
expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of block generation time (meaningless)
W/ PR: shows average block generation time over 5min period

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero

The 'Block Generation Time' panel's Prometheus query divides by increase(ray_data_num_task_outputs_generated[5m]). This denominator can be zero when no task outputs are generated in a 5-minute window, leading to NaN/Inf values or dashboard display issues.

Fix in Cursor Fix in Web

legend="Block Generation Time: {{dataset}}, {{operator}}",
)
],
Expand All @@ -466,7 +466,7 @@
unit="seconds",
targets=[
Target(
expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of submitted tasks (could be meaningful)
W/ PR: shows current # of submitted tasks (I find this more meaningful)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero Issue

The Prometheus query for the "Task Submission Backpressure Time" panel can result in division by zero. If no tasks are submitted within a 5-minute window, the denominator increase(ray_data_num_tasks_submitted...[5m]) becomes zero, leading to NaN or undefined values on the dashboard.

Fix in Cursor Fix in Web

legend="Backpressure Time: {{dataset}}, {{operator}}",
)
],
Expand Down
10 changes: 10 additions & 0 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,16 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
return None
return self._op_resource_allocator.get_budget(op)

def clear_usages_and_budget(self):
"""Resets the usages and budgets. This function should only be called on
executor shutdown.
"""
assert isinstance(self.op_resource_allocator, ReservationOpResourceAllocator)
for op in self._op_usages:
self._op_usages[op] = ExecutionResources.zero()
self.op_resource_allocator._op_budgets[op] = ExecutionResources.zero()
self.op_resource_allocator._output_budgets[op] = 0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Null Resource Allocator Causes Shutdown Failures

The clear_usages_and_budget method assumes op_resource_allocator always exists and is a ReservationOpResourceAllocator. This causes an AssertionError when _op_resource_allocator is None (e.g., when resource allocation is disabled), leading to failures during executor shutdown.

Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is brittle and breaks abstraction barriers. If we change the implementation of ReservationOpResourceAllocator or change the allocator altogether, this could break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good shout, updated



class OpResourceAllocator(ABC):
"""An interface for dynamic operator resource allocation.
Expand Down
5 changes: 3 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None):
stats_summary_string = self._final_stats.to_summary().to_string(
include_parent=False
)
# Reset the scheduling loop duration gauge.
self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant to be nuked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the update_metrics calls it

# Reset the scheduling loop duration gauge + resource manager budgets/usages.
self._resource_manager.clear_usages_and_budget()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamjustinhsu if we call the regular update_usages here, does that clear the budget and usages? If not, why?

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not entirely, because budget metrics are done outside of resource manager. On shutdown, the budgets, ie, it will show a line of the last updated budget/usage, which will non-zero

Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok fixed it with latest commits. The reason budget wasn't resetting was due to how we clear the budgets for each operator when we call update_usages. See #57246 (comment)

self.update_metrics(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Executor Shutdown Fails to Reset Budget Gauges

During executor shutdown, update_usages() sets operator budgets to None. Since _update_budget_metrics() only updates gauges when the budget is not None, budget gauges are not reset to 0 and retain their last non-zero values.

Fix in Cursor Fix in Web

if self._data_context.enable_auto_log_stats:
logger.info(stats_summary_string)
# Close the progress bars from top to bottom to avoid them jumping
Expand Down