From 436271807304306631eda19d4de4c22d29a072de Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 6 Oct 2025 16:49:55 -0700 Subject: [PATCH 01/13] [data] reset metrics on executor shutdown Signed-off-by: iamjustinhsu --- .../metrics/dashboards/data_dashboard_panels.py | 4 ++-- .../ray/data/_internal/execution/resource_manager.py | 10 +++++++++- .../ray/data/_internal/execution/streaming_executor.py | 5 +++-- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py b/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py index b9b9e14abc17..032af17d1e19 100644 --- a/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py +++ b/python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py @@ -451,7 +451,7 @@ unit="seconds", targets=[ Target( - expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)', + expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])', legend="Block Generation Time: {{dataset}}, {{operator}}", ) ], @@ -466,7 +466,7 @@ unit="seconds", targets=[ Target( - expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)', + expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])', legend="Backpressure Time: {{dataset}}, {{operator}}", ) ], diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index be23a3001eb5..278c971e39e3 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -330,7 +330,15 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: return None return self._op_resource_allocator.get_budget(op) - + def clear_usages_and_budget(self): + """Resets the usages and budgets. This function should only be called on + executor shutdown. + """ + assert isinstance(self.op_resource_allocator, ReservationOpResourceAllocator) + for op in self._op_usages: + self._op_usages[op] = ExecutionResources.zero() + self.op_resource_allocator._op_budgets[op] = ExecutionResources.zero() + self.op_resource_allocator._output_budgets[op] = 0 class OpResourceAllocator(ABC): """An interface for dynamic operator resource allocation. diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 7d95c7e4fc72..be96195289de 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -258,8 +258,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None): stats_summary_string = self._final_stats.to_summary().to_string( include_parent=False ) - # Reset the scheduling loop duration gauge. - self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id}) + # Reset the scheduling loop duration gauge + resource manager budgets/usages. + self._resource_manager.clear_usages_and_budget() + self.update_metrics(0) if self._data_context.enable_auto_log_stats: logger.info(stats_summary_string) # Close the progress bars from top to bottom to avoid them jumping From c0d459bb2218f0bb039620429ec6d18c849fef5e Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Mon, 6 Oct 2025 17:33:27 -0700 Subject: [PATCH 02/13] lint Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 278c971e39e3..aa5b0b54b4e7 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -339,6 +339,8 @@ def clear_usages_and_budget(self): self._op_usages[op] = ExecutionResources.zero() self.op_resource_allocator._op_budgets[op] = ExecutionResources.zero() self.op_resource_allocator._output_budgets[op] = 0 + + class OpResourceAllocator(ABC): """An interface for dynamic operator resource allocation. From 0d4339f09b14dfb9a6e84c31a7607b0388c763e4 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 14:09:25 -0700 Subject: [PATCH 03/13] zero out budget without clearing Signed-off-by: iamjustinhsu --- .../_internal/execution/resource_manager.py | 17 +++++------------ .../_internal/execution/streaming_executor.py | 2 +- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index aa5b0b54b4e7..7b27cf26522c 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -330,17 +330,6 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: return None return self._op_resource_allocator.get_budget(op) - def clear_usages_and_budget(self): - """Resets the usages and budgets. This function should only be called on - executor shutdown. - """ - assert isinstance(self.op_resource_allocator, ReservationOpResourceAllocator) - for op in self._op_usages: - self._op_usages[op] = ExecutionResources.zero() - self.op_resource_allocator._op_budgets[op] = ExecutionResources.zero() - self.op_resource_allocator._output_budgets[op] = 0 - - class OpResourceAllocator(ABC): """An interface for dynamic operator resource allocation. @@ -697,8 +686,12 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() - self._op_budgets.clear() + eligible_ops = self._get_eligible_ops() + + for op in eligible_ops: + self._op_budgets[op] = ExecutionResources.zero() + if len(eligible_ops) == 0: return diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index be96195289de..664bca0c9865 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -259,7 +259,7 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None): include_parent=False ) # Reset the scheduling loop duration gauge + resource manager budgets/usages. - self._resource_manager.clear_usages_and_budget() + self._resource_manager.update_usages() self.update_metrics(0) if self._data_context.enable_auto_log_stats: logger.info(stats_summary_string) From c71130e8e505b6004ebd1cfe2f415f70726b1011 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 14:10:08 -0700 Subject: [PATCH 04/13] lint Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 7b27cf26522c..886cecec55cb 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -330,6 +330,7 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: return None return self._op_resource_allocator.get_budget(op) + class OpResourceAllocator(ABC): """An interface for dynamic operator resource allocation. @@ -686,7 +687,6 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() - eligible_ops = self._get_eligible_ops() for op in eligible_ops: From b0672a81498c176dbb87dfc84420dc8bdf44a847 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 14:19:02 -0700 Subject: [PATCH 05/13] fix Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 886cecec55cb..8fd299e9bb1a 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -687,14 +687,15 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() - eligible_ops = self._get_eligible_ops() - - for op in eligible_ops: + for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() + eligible_ops = self._get_eligible_ops() if len(eligible_ops) == 0: return + self._op_budgets = {op: None for op in eligible_ops} + # Remaining of shared resources. remaining_shared = self._total_shared for op in eligible_ops: From ada3c90fd28a35907e1ae82dcf9cc3f99fa9cfef Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 14:20:48 -0700 Subject: [PATCH 06/13] lint Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 8fd299e9bb1a..04a6c2c36901 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -687,6 +687,9 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() + # NOTE: We don't call self._op_budgets.clear() here + # so that the last completed operator can reset their + # metrics. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() From 8f12426ca47c871e8d97e662bfa45988210c3770 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 15:51:52 -0700 Subject: [PATCH 07/13] clearer? Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 04a6c2c36901..17ce517fafa8 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -687,9 +687,9 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() - # NOTE: We don't call self._op_budgets.clear() here - # so that the last completed operator can reset their - # metrics. + # NOTE: We don't call self._op_budgets.clear() so that + # the streaming executor can indirectly loop through + # self._op_budgets and reset their budget gauges. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() @@ -697,8 +697,6 @@ def update_usages(self): if len(eligible_ops) == 0: return - self._op_budgets = {op: None for op in eligible_ops} - # Remaining of shared resources. remaining_shared = self._total_shared for op in eligible_ops: From a95b8929bca41f1e5ab868cf4787fa044f8710e2 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 15:52:35 -0700 Subject: [PATCH 08/13] clearer? pt2 Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 17ce517fafa8..144427b04d41 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -689,7 +689,8 @@ def update_usages(self): # NOTE: We don't call self._op_budgets.clear() so that # the streaming executor can indirectly loop through - # self._op_budgets and reset their budget gauges. + # self._op_budgets and reset each operator's budget gauges. + # clear(), will wipe out that information. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() From de78cedd1381983dba97e8b529768a703555acca Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 15:53:00 -0700 Subject: [PATCH 09/13] ugh Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 144427b04d41..30d07700f67b 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -689,7 +689,7 @@ def update_usages(self): # NOTE: We don't call self._op_budgets.clear() so that # the streaming executor can indirectly loop through - # self._op_budgets and reset each operator's budget gauges. + # self._op_budgets and reset each operator budget in gauges. # clear(), will wipe out that information. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() From 0aa20493f293e6ade33b12970b10132dbbb95838 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 15:54:13 -0700 Subject: [PATCH 10/13] grammar Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 30d07700f67b..7c5eb85214bb 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -689,7 +689,7 @@ def update_usages(self): # NOTE: We don't call self._op_budgets.clear() so that # the streaming executor can indirectly loop through - # self._op_budgets and reset each operator budget in gauges. + # self._op_budgets and reset each operator's budget in gauges. # clear(), will wipe out that information. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() From 9a25054418be08942b33bc68e1a812f84566f083 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 15:54:52 -0700 Subject: [PATCH 11/13] guh Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 7c5eb85214bb..2d329d0abc1e 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -690,7 +690,8 @@ def update_usages(self): # NOTE: We don't call self._op_budgets.clear() so that # the streaming executor can indirectly loop through # self._op_budgets and reset each operator's budget in gauges. - # clear(), will wipe out that information. + # clear(), will wipe out that information and that gauge won't + # update. for op in self._op_budgets: self._op_budgets[op] = ExecutionResources.zero() From 5838314ab02510117040dc27fd914b0596729ea3 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 16:03:32 -0700 Subject: [PATCH 12/13] use None Signed-off-by: iamjustinhsu --- python/ray/data/_internal/execution/resource_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 2d329d0abc1e..29724fcd57dc 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -693,7 +693,7 @@ def update_usages(self): # clear(), will wipe out that information and that gauge won't # update. for op in self._op_budgets: - self._op_budgets[op] = ExecutionResources.zero() + self._op_budgets[op] = None eligible_ops = self._get_eligible_ops() if len(eligible_ops) == 0: From 24f87eb5253f83f5e1642cdca08325f5116876a2 Mon Sep 17 00:00:00 2001 From: iamjustinhsu Date: Tue, 21 Oct 2025 16:08:48 -0700 Subject: [PATCH 13/13] try this Signed-off-by: iamjustinhsu --- .../data/_internal/execution/resource_manager.py | 9 +-------- .../_internal/execution/streaming_executor.py | 16 +++++++++++----- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 29724fcd57dc..be23a3001eb5 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -687,14 +687,7 @@ def _get_downstream_eligible_ops( def update_usages(self): self._update_reservation() - # NOTE: We don't call self._op_budgets.clear() so that - # the streaming executor can indirectly loop through - # self._op_budgets and reset each operator's budget in gauges. - # clear(), will wipe out that information and that gauge won't - # update. - for op in self._op_budgets: - self._op_budgets[op] = None - + self._op_budgets.clear() eligible_ops = self._get_eligible_ops() if len(eligible_ops) == 0: return diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index 664bca0c9865..3a25c8f6b506 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -365,7 +365,12 @@ def update_metrics(self, sched_loop_duration: int): def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]): budget = self._resource_manager.get_budget(op) - if budget is not None: + if budget is None: + cpu_budget = 0 + gpu_budget = 0 + memory_budget = 0 + object_store_memory_budget = 0 + else: # Convert inf to -1 to represent unlimited budget in metrics cpu_budget = -1 if math.isinf(budget.cpu) else budget.cpu gpu_budget = -1 if math.isinf(budget.gpu) else budget.gpu @@ -375,10 +380,11 @@ def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]): if math.isinf(budget.object_store_memory) else budget.object_store_memory ) - self._cpu_budget_gauge.set(cpu_budget, tags=tags) - self._gpu_budget_gauge.set(gpu_budget, tags=tags) - self._memory_budget_gauge.set(memory_budget, tags=tags) - self._osm_budget_gauge.set(object_store_memory_budget, tags=tags) + + self._cpu_budget_gauge.set(cpu_budget, tags=tags) + self._gpu_budget_gauge.set(gpu_budget, tags=tags) + self._memory_budget_gauge.set(memory_budget, tags=tags) + self._osm_budget_gauge.set(object_store_memory_budget, tags=tags) def _update_max_bytes_to_read_metric( self, op: PhysicalOperator, tags: Dict[str, str]