Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@
unit="s",
targets=[
Target(
expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of block generation time (meaningless)
W/ PR: shows average block generation time over 5min period

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero

The 'Block Generation Time' panel's Prometheus query divides by increase(ray_data_num_task_outputs_generated[5m]). This denominator can be zero when no task outputs are generated in a 5-minute window, leading to NaN/Inf values or dashboard display issues.

Fix in Cursor Fix in Web

legend="Block Generation Time: {{dataset}}, {{operator}}",
)
],
Expand All @@ -468,7 +468,7 @@
unit="s",
targets=[
Target(
expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])',
Copy link
Contributor Author

@iamjustinhsu iamjustinhsu Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

W/O PR: shows total sum of submitted tasks (could be meaningful)
W/ PR: shows current # of submitted tasks (I find this more meaningful)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Prometheus Query Division by Zero Issue

The Prometheus query for the "Task Submission Backpressure Time" panel can result in division by zero. If no tasks are submitted within a 5-minute window, the denominator increase(ray_data_num_tasks_submitted...[5m]) becomes zero, leading to NaN or undefined values on the dashboard.

Fix in Cursor Fix in Web

legend="Backpressure Time: {{dataset}}, {{operator}}",
)
],
Expand Down
21 changes: 14 additions & 7 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None):
stats_summary_string = self._final_stats.to_summary().to_string(
include_parent=False
)
# Reset the scheduling loop duration gauge.
self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this meant to be nuked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the update_metrics calls it

# Reset the scheduling loop duration gauge + resource manager budgets/usages.
self._resource_manager.update_usages()
self.update_metrics(0)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Executor Shutdown Fails to Reset Budget Gauges

During executor shutdown, update_usages() sets operator budgets to None. Since _update_budget_metrics() only updates gauges when the budget is not None, budget gauges are not reset to 0 and retain their last non-zero values.

Fix in Cursor Fix in Web

if self._data_context.enable_auto_log_stats:
logger.info(stats_summary_string)
# Close the progress manager with a finishing message.
Expand Down Expand Up @@ -387,7 +388,12 @@ def update_metrics(self, sched_loop_duration: int):

def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
budget = self._resource_manager.get_budget(op)
if budget is not None:
if budget is None:
cpu_budget = 0
gpu_budget = 0
memory_budget = 0
object_store_memory_budget = 0
else:
# Convert inf to -1 to represent unlimited budget in metrics
cpu_budget = -1 if math.isinf(budget.cpu) else budget.cpu
gpu_budget = -1 if math.isinf(budget.gpu) else budget.gpu
Expand All @@ -397,10 +403,11 @@ def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
if math.isinf(budget.object_store_memory)
else budget.object_store_memory
)
self._cpu_budget_gauge.set(cpu_budget, tags=tags)
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
self._memory_budget_gauge.set(memory_budget, tags=tags)
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)

self._cpu_budget_gauge.set(cpu_budget, tags=tags)
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
self._memory_budget_gauge.set(memory_budget, tags=tags)
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)

def _update_max_bytes_to_read_metric(
self, op: PhysicalOperator, tags: Dict[str, str]
Expand Down