diff --git a/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py b/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py index 6cefa6937297..212e01e97793 100644 --- a/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py +++ b/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py @@ -453,7 +453,7 @@ unit="s", targets=[ Target( - expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)', + expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])', legend="Block Generation Time: {{dataset}}, {{operator}}", ) ], @@ -468,7 +468,7 @@ unit="s", targets=[ Target( - expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)', + expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])', legend="Backpressure Time: {{dataset}}, {{operator}}", ) ], diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 5b3e216fb4cb..22f4c390d49a 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -277,8 +277,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None): stats_summary_string = self._final_stats.to_summary().to_string( include_parent=False ) - # Reset the scheduling loop duration gauge. - self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id}) + # Reset the scheduling loop duration gauge + resource manager budgets/usages. + self._resource_manager.update_usages() + self.update_metrics(0) if self._data_context.enable_auto_log_stats: logger.info(stats_summary_string) # Close the progress manager with a finishing message. @@ -387,7 +388,12 @@ def update_metrics(self, sched_loop_duration: int): def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]): budget = self._resource_manager.get_budget(op) - if budget is not None: + if budget is None: + cpu_budget = 0 + gpu_budget = 0 + memory_budget = 0 + object_store_memory_budget = 0 + else: # Convert inf to -1 to represent unlimited budget in metrics cpu_budget = -1 if math.isinf(budget.cpu) else budget.cpu gpu_budget = -1 if math.isinf(budget.gpu) else budget.gpu @@ -397,10 +403,11 @@ def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]): if math.isinf(budget.object_store_memory) else budget.object_store_memory ) - self._cpu_budget_gauge.set(cpu_budget, tags=tags) - self._gpu_budget_gauge.set(gpu_budget, tags=tags) - self._memory_budget_gauge.set(memory_budget, tags=tags) - self._osm_budget_gauge.set(object_store_memory_budget, tags=tags) + + self._cpu_budget_gauge.set(cpu_budget, tags=tags) + self._gpu_budget_gauge.set(gpu_budget, tags=tags) + self._memory_budget_gauge.set(memory_budget, tags=tags) + self._osm_budget_gauge.set(object_store_memory_budget, tags=tags) def _update_max_bytes_to_read_metric( self, op: PhysicalOperator, tags: Dict[str, str]