Skip to content

Commit 75b4ad5

Browse files
iamjustinhsuFuture-Outlier
authored andcommitted
[data] reset cpu + gpu metrics on executor shutdown and updating task submission/block generation metrics (ray-project#57246)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? On executor shutdown, the metrics persist even after execution. The plan is to reset on streaming_executor.shutdown. This PR also includes 2 potential drive-by fixes for metric calculation <!-- Please give a short summary of the change and the problem this solves. --> ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: iamjustinhsu <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
1 parent 7d701e8 commit 75b4ad5

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

python/ray/dashboard/modules/metrics/dashboards/data_dashboard_panels.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@
453453
unit="s",
454454
targets=[
455455
Target(
456-
expr='sum(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
456+
expr='increase(ray_data_block_generation_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_task_outputs_generated{{{global_filters}, operator=~"$Operator"}}[5m])',
457457
legend="Block Generation Time: {{dataset}}, {{operator}}",
458458
)
459459
],
@@ -468,7 +468,7 @@
468468
unit="s",
469469
targets=[
470470
Target(
471-
expr='sum(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}) by (dataset, operator)',
471+
expr='increase(ray_data_task_submission_backpressure_time{{{global_filters}, operator=~"$Operator"}}[5m]) / increase(ray_data_num_tasks_submitted{{{global_filters}, operator=~"$Operator"}}[5m])',
472472
legend="Backpressure Time: {{dataset}}, {{operator}}",
473473
)
474474
],

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,9 @@ def shutdown(self, force: bool, exception: Optional[Exception] = None):
277277
stats_summary_string = self._final_stats.to_summary().to_string(
278278
include_parent=False
279279
)
280-
# Reset the scheduling loop duration gauge.
281-
self._sched_loop_duration_s.set(0, tags={"dataset": self._dataset_id})
280+
# Reset the scheduling loop duration gauge + resource manager budgets/usages.
281+
self._resource_manager.update_usages()
282+
self.update_metrics(0)
282283
if self._data_context.enable_auto_log_stats:
283284
logger.info(stats_summary_string)
284285
# Close the progress manager with a finishing message.
@@ -387,7 +388,12 @@ def update_metrics(self, sched_loop_duration: int):
387388

388389
def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
389390
budget = self._resource_manager.get_budget(op)
390-
if budget is not None:
391+
if budget is None:
392+
cpu_budget = 0
393+
gpu_budget = 0
394+
memory_budget = 0
395+
object_store_memory_budget = 0
396+
else:
391397
# Convert inf to -1 to represent unlimited budget in metrics
392398
cpu_budget = -1 if math.isinf(budget.cpu) else budget.cpu
393399
gpu_budget = -1 if math.isinf(budget.gpu) else budget.gpu
@@ -397,10 +403,11 @@ def _update_budget_metrics(self, op: PhysicalOperator, tags: Dict[str, str]):
397403
if math.isinf(budget.object_store_memory)
398404
else budget.object_store_memory
399405
)
400-
self._cpu_budget_gauge.set(cpu_budget, tags=tags)
401-
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
402-
self._memory_budget_gauge.set(memory_budget, tags=tags)
403-
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)
406+
407+
self._cpu_budget_gauge.set(cpu_budget, tags=tags)
408+
self._gpu_budget_gauge.set(gpu_budget, tags=tags)
409+
self._memory_budget_gauge.set(memory_budget, tags=tags)
410+
self._osm_budget_gauge.set(object_store_memory_budget, tags=tags)
404411

405412
def _update_max_bytes_to_read_metric(
406413
self, op: PhysicalOperator, tags: Dict[str, str]

0 commit comments

Comments
 (0)