Skip to content

Commit

Permalink
[Data] Display pending actors separately in the progress bar and not …
Browse files Browse the repository at this point in the history
…count them towards running resources (#46384) (#46729)

## Why are these changes needed?
Currently, when using Ray Dataset in CPU+GPU workload, the progress bar
sometimes reports more GPUs than the number of actually running GPUs.
This is because pending actors are counted towards the total number of
GPUs even though there isn't any other GPU available in the cluster.
This change excludes pending actors when calculating the total number of
in-use GPUs and displaying them separately in the progress bar.

## Related issue number
Resolves #46384 

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [x] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: jeffreyjeffreywang <[email protected]>
Signed-off-by: Hao Chen <[email protected]>
Co-authored-by: jeffreyjeffreywang <[email protected]>
Co-authored-by: Scott Lee <[email protected]>
Co-authored-by: Hao Chen <[email protected]>
  • Loading branch information
4 people authored Aug 22, 2024
1 parent 12d8ecf commit e47482d
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,32 @@ def current_processor_usage(self) -> ExecutionResources:
"""
return ExecutionResources(0, 0, 0)

def running_processor_usage(self) -> ExecutionResources:
"""Returns the estimated running CPU and GPU usage of this operator, excluding
object store memory.
This method is called by the resource manager and the streaming
executor to display the number of currently running CPUs and GPUs in the
progress bar.
Note, this method returns `current_processor_usage() -
pending_processor_usage()` by default. Subclasses should only override
`pending_processor_usage()` if needed.
"""
usage = self.current_processor_usage()
usage = usage.subtract(self.pending_processor_usage())
return usage

def pending_processor_usage(self) -> ExecutionResources:
"""Returns the estimated pending CPU and GPU usage of this operator, excluding
object store memory.
This method is called by the resource manager and the streaming
executor to display the number of currently pending actors in the
progress bar.
"""
return ExecutionResources(0, 0, 0)

def base_resource_usage(self) -> ExecutionResources:
"""Returns the minimum amount of resources required for execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,13 @@ def current_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
)

def pending_processor_usage(self) -> ExecutionResources:
num_pending_workers = self._actor_pool.num_pending_actors()
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_pending_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
)

def incremental_resource_usage(self) -> ExecutionResources:
# Submitting tasks to existing actors doesn't require additional
# CPU/GPU resources.
Expand Down
4 changes: 4 additions & 0 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ def shutdown(self):
def current_processor_usage(self) -> ExecutionResources:
raise NotImplementedError

@abstractmethod
def pending_processor_usage(self) -> ExecutionResources:
raise NotImplementedError

@abstractmethod
def base_resource_usage(self) -> ExecutionResources:
raise NotImplementedError
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ def current_processor_usage(self) -> ExecutionResources:
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
)

def pending_processor_usage(self) -> ExecutionResources:
return ExecutionResources()

def incremental_resource_usage(self) -> ExecutionResources:
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0),
Expand Down
50 changes: 44 additions & 6 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ def __init__(
self._global_limits = ExecutionResources.zero()
self._global_limits_last_update_time = 0
self._global_usage = ExecutionResources.zero()
self._global_running_usage = ExecutionResources.zero()
self._global_pending_usage = ExecutionResources.zero()
self._op_usages: Dict[PhysicalOperator, ExecutionResources] = {}
self._op_running_usages: Dict[PhysicalOperator, ExecutionResources] = {}
self._op_pending_usages: Dict[PhysicalOperator, ExecutionResources] = {}
# Object store memory usage internal to the operator, including the
# pending task outputs and op's internal output buffers.
self._mem_op_internal: Dict[PhysicalOperator, int] = defaultdict(int)
Expand Down Expand Up @@ -109,21 +113,45 @@ def update_usages(self):
# And some computations are redundant. We should either remove redundant
# computations or remove this method entirely and compute usages on demand.
self._global_usage = ExecutionResources(0, 0, 0)
self._global_running_usage = ExecutionResources(0, 0, 0)
self._global_pending_usage = ExecutionResources(0, 0, 0)
self._op_usages.clear()
self._op_running_usages.clear()
self._op_pending_usages.clear()
self._downstream_fraction.clear()
self._downstream_object_store_memory.clear()

# Iterate from last to first operator.
num_ops_so_far = 0
num_ops_total = len(self._topology)
for op, state in reversed(self._topology.items()):
# Update `self._op_usages`.
# Update `self._op_usages`, `self._op_running_usages`,
# and `self._op_pending_usages`.
op_usage = op.current_processor_usage()
op_running_usage = op.running_processor_usage()
op_pending_usage = op.pending_processor_usage()

assert not op_usage.object_store_memory
assert not op_running_usage.object_store_memory
assert not op_pending_usage.object_store_memory
op_usage.object_store_memory = self._estimate_object_store_memory(op, state)
op_running_usage.object_store_memory = self._estimate_object_store_memory(
op, state
)
self._op_usages[op] = op_usage
# Update `self._global_usage`.
self._op_running_usages[op] = op_running_usage
self._op_pending_usages[op] = op_pending_usage

# Update `self._global_usage`, `self._global_running_usage`,
# and `self._global_pending_usage`.
self._global_usage = self._global_usage.add(op_usage)
self._global_running_usage = self._global_running_usage.add(
op_running_usage
)
self._global_pending_usage = self._global_pending_usage.add(
op_pending_usage
)

# Update `self._downstream_fraction` and `_downstream_object_store_memory`.
# Subtract one from denom to account for input buffer.
f = (1.0 + num_ops_so_far) / max(1.0, num_ops_total - 1.0)
Expand All @@ -144,6 +172,14 @@ def get_global_usage(self) -> ExecutionResources:
"""Return the global resource usage at the current time."""
return self._global_usage

def get_global_running_usage(self) -> ExecutionResources:
"""Return the global running resource usage at the current time."""
return self._global_running_usage

def get_global_pending_usage(self) -> ExecutionResources:
"""Return the global pending resource usage at the current time."""
return self._global_pending_usage

def get_global_limits(self) -> ExecutionResources:
"""Return the global resource limits at the current time.
Expand Down Expand Up @@ -177,10 +213,12 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources:
def get_op_usage_str(self, op: PhysicalOperator) -> str:
"""Return a human-readable string representation of the resource usage of
the given operator."""
usage_str = f"cpu: {self._op_usages[op].cpu:.1f}"
if self._op_usages[op].gpu:
usage_str += f", gpu: {self._op_usages[op].gpu:.1f}"
usage_str += f", objects: {self._op_usages[op].object_store_memory_str()}"
usage_str = f"cpu: {self._op_running_usages[op].cpu:.1f}"
if self._op_running_usages[op].gpu:
usage_str += f", gpu: {self._op_running_usages[op].gpu:.1f}"
usage_str += (
f", objects: {self._op_running_usages[op].object_store_memory_str()}"
)
if self._debug:
usage_str += (
f" (in={memory_string(self._mem_op_internal[op])},"
Expand Down
16 changes: 10 additions & 6 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,14 +352,18 @@ def _consumer_idling(self) -> bool:
return len(self._output_node.outqueue) == 0

def _report_current_usage(self) -> None:
cur_usage = self._resource_manager.get_global_usage()
running_usage = self._resource_manager.get_global_running_usage()
pending_usage = self._resource_manager.get_global_pending_usage()
limits = self._resource_manager.get_global_limits()
resources_status = (
"Running: "
f"{cur_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
f"{cur_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
f"{cur_usage.object_store_memory_str()}/"
f"{limits.object_store_memory_str()} object_store_memory"
"Running. Resources: "
f"{running_usage.cpu:.4g}/{limits.cpu:.4g} CPU, "
f"{running_usage.gpu:.4g}/{limits.gpu:.4g} GPU, "
f"{running_usage.object_store_memory_str()}/"
f"{limits.object_store_memory_str()} object_store_memory "
"(pending: "
f"{pending_usage.cpu:.4g} CPU, "
f"{pending_usage.gpu:.4g} GPU)"
)
if self._global_info:
self._global_info.set_description(resources_status)
Expand Down

0 comments on commit e47482d

Please sign in to comment.