-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[Data] ConcurrencyCapBackpressurePolicy - Handle internal output queue buildup #57996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
alexeykudinkin
merged 9 commits into
master
from
srinathk10/concurrency_cap_backpressure_policy
Oct 23, 2025
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
38f6ab3
[Data] ConcurrencyCapBackpressurePolicy - Handle internal output queu…
srinathk10 8e49704
Cleanups
srinathk10 bd8ad05
Cleanup
srinathk10 01463ef
Cleanup
srinathk10 907d022
Fixups
srinathk10 999fb06
Cleanup
srinathk10 8d65ebf
Cleanup
srinathk10 d5037b9
Address comments
srinathk10 ff91966
Lint
srinathk10 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
424 changes: 417 additions & 7 deletions
424
...n/ray/data/_internal/execution/backpressure_policy/concurrency_cap_backpressure_policy.py
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -277,6 +277,14 @@ def get_op_usage(self, op: PhysicalOperator) -> ExecutionResources: | |
| """Return the resource usage of the given operator at the current time.""" | ||
| return self._op_usages[op] | ||
|
|
||
| def get_mem_op_internal(self, op: PhysicalOperator) -> int: | ||
| """Return the memory usage of the internal buffers of the given operator.""" | ||
| return self._mem_op_internal[op] | ||
|
|
||
| def get_mem_op_outputs(self, op: PhysicalOperator) -> int: | ||
| """Return the memory usage of the outputs of the given operator.""" | ||
| return self._mem_op_outputs[op] | ||
|
|
||
| def get_op_usage_str(self, op: PhysicalOperator) -> str: | ||
| """Return a human-readable string representation of the resource usage of | ||
| the given operator.""" | ||
|
|
@@ -288,8 +296,8 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str: | |
| ) | ||
| if self._debug: | ||
| usage_str += ( | ||
| f" (in={memory_string(self._mem_op_internal[op])}," | ||
| f"out={memory_string(self._mem_op_outputs[op])})" | ||
| f" (in={memory_string(self.get_mem_op_internal(op))}," | ||
| f"out={memory_string(self.get_mem_op_outputs(op))})" | ||
| ) | ||
| if ( | ||
| isinstance(self._op_resource_allocator, ReservationOpResourceAllocator) | ||
|
|
@@ -330,6 +338,69 @@ def get_budget(self, op: PhysicalOperator) -> Optional[ExecutionResources]: | |
| return None | ||
| return self._op_resource_allocator.get_budget(op) | ||
|
|
||
| def is_op_eligible(self, op: PhysicalOperator) -> bool: | ||
| """Whether the op is eligible for memory reservation.""" | ||
| return ( | ||
| not op.throttling_disabled() | ||
| # As long as the op has finished execution, even if there are still | ||
| # non-taken outputs, we don't need to allocate resources for it. | ||
| and not op.execution_finished() | ||
| ) | ||
|
|
||
| def get_eligible_ops(self) -> List[PhysicalOperator]: | ||
| return [op for op in self._topology if self.is_op_eligible(op)] | ||
|
|
||
| def get_downstream_ineligible_ops( | ||
| self, op: PhysicalOperator | ||
| ) -> Iterable[PhysicalOperator]: | ||
| """Get the downstream ineligible operators of the given operator. | ||
|
|
||
| E.g., | ||
| - "cur_map->downstream_map" will return an empty list. | ||
| - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2]. | ||
| """ | ||
| for next_op in op.output_dependencies: | ||
| if not self.is_op_eligible(next_op): | ||
| yield next_op | ||
| yield from self.get_downstream_ineligible_ops(next_op) | ||
|
|
||
| def get_downstream_eligible_ops( | ||
| self, op: PhysicalOperator | ||
| ) -> Iterable[PhysicalOperator]: | ||
| """Get the downstream eligible operators of the given operator, ignoring | ||
| intermediate ineligible operators. | ||
|
|
||
| E.g., | ||
| - "cur_map->downstream_map" will return [downstream_map]. | ||
| - "cur_map->limit1->limit2->downstream_map" will return [downstream_map]. | ||
| """ | ||
| for next_op in op.output_dependencies: | ||
| if self.is_op_eligible(next_op): | ||
| yield next_op | ||
| else: | ||
| yield from self.get_downstream_eligible_ops(next_op) | ||
|
|
||
| def get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> float: | ||
| """Get the outputs memory usage of the given operator, including the downstream | ||
| ineligible operators. | ||
| """ | ||
| # Outputs usage of the current operator. | ||
| op_outputs_usage = self.get_mem_op_outputs(op) | ||
| # Also account the downstream ineligible operators' memory usage. | ||
| op_outputs_usage += sum( | ||
| self.get_op_usage(next_op).object_store_memory | ||
| for next_op in self.get_downstream_ineligible_ops(op) | ||
| ) | ||
| return op_outputs_usage | ||
|
|
||
| def get_op_outputs_usage_with_internal_and_downstream( | ||
| self, op: PhysicalOperator | ||
| ) -> float: | ||
| """Get the outputs memory usage of the given operator, including the internal usage and the downstream ineligible operators.""" | ||
| return self.get_mem_op_internal(op) + self.get_op_outputs_usage_with_downstream( | ||
| op | ||
| ) | ||
|
|
||
|
||
|
|
||
| class OpResourceAllocator(ABC): | ||
| """An interface for dynamic operator resource allocation. | ||
|
|
@@ -479,20 +550,6 @@ def __init__(self, resource_manager: ResourceManager, reservation_ratio: float): | |
|
|
||
| self._idle_detector = self.IdleDetector() | ||
|
|
||
| def _is_op_eligible(self, op: PhysicalOperator) -> bool: | ||
| """Whether the op is eligible for memory reservation.""" | ||
| return ( | ||
| not op.throttling_disabled() | ||
| # As long as the op has finished execution, even if there are still | ||
| # non-taken outputs, we don't need to allocate resources for it. | ||
| and not op.execution_finished() | ||
| ) | ||
|
|
||
| def _get_eligible_ops(self) -> List[PhysicalOperator]: | ||
| return [ | ||
| op for op in self._resource_manager._topology if self._is_op_eligible(op) | ||
| ] | ||
|
|
||
| def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]: | ||
| """ | ||
| Resource reservation is based on the number of eligible operators. | ||
|
|
@@ -519,14 +576,14 @@ def _get_ineligible_ops_with_usage(self) -> List[PhysicalOperator]: | |
| # filter out downstream ineligible operators since they are omitted from reservation calculations. | ||
| for op in last_completed_ops: | ||
| ops_to_exclude_from_reservation.extend( | ||
| list(self._get_downstream_ineligible_ops(op)) | ||
| list(self._resource_manager.get_downstream_ineligible_ops(op)) | ||
| ) | ||
| ops_to_exclude_from_reservation.append(op) | ||
| return list(set(ops_to_exclude_from_reservation)) | ||
|
|
||
| def _update_reservation(self): | ||
| global_limits = self._resource_manager.get_global_limits().copy() | ||
| eligible_ops = self._get_eligible_ops() | ||
| eligible_ops = self._resource_manager.get_eligible_ops() | ||
|
|
||
| self._op_reserved.clear() | ||
| self._reserved_for_op_outputs.clear() | ||
|
|
@@ -610,7 +667,7 @@ def _should_unblock_streaming_output_backpressure( | |
| # launch tasks. Then we should temporarily unblock the streaming output | ||
| # backpressure by allowing reading at least 1 block. So the current operator | ||
| # can finish at least one task and yield resources to the downstream operators. | ||
| for next_op in self._get_downstream_eligible_ops(op): | ||
| for next_op in self._resource_manager.get_downstream_eligible_ops(op): | ||
| if not self._reserved_min_resources[next_op]: | ||
| # Case 1: the downstream operator hasn't reserved the minimum resources | ||
| # to run at least one task. | ||
|
|
@@ -623,25 +680,14 @@ def _should_unblock_streaming_output_backpressure( | |
| return True | ||
| return False | ||
|
|
||
| def _get_op_outputs_usage_with_downstream(self, op: PhysicalOperator) -> float: | ||
| """Get the outputs memory usage of the given operator, including the downstream | ||
| ineligible operators. | ||
| """ | ||
| # Outputs usage of the current operator. | ||
| op_outputs_usage = self._resource_manager._mem_op_outputs[op] | ||
| # Also account the downstream ineligible operators' memory usage. | ||
| op_outputs_usage += sum( | ||
| self._resource_manager.get_op_usage(next_op).object_store_memory | ||
| for next_op in self._get_downstream_ineligible_ops(op) | ||
| ) | ||
| return op_outputs_usage | ||
|
|
||
| def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: | ||
| if op not in self._op_budgets: | ||
| return None | ||
| res = self._op_budgets[op].object_store_memory | ||
| # Add the remaining of `_reserved_for_op_outputs`. | ||
| op_outputs_usage = self._get_op_outputs_usage_with_downstream(op) | ||
| op_outputs_usage = self._resource_manager.get_op_outputs_usage_with_downstream( | ||
| op | ||
| ) | ||
| res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0) | ||
| if math.isinf(res): | ||
| self._output_budgets[op] = res | ||
|
|
@@ -654,41 +700,11 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]: | |
| self._output_budgets[op] = res | ||
| return res | ||
|
|
||
| def _get_downstream_ineligible_ops( | ||
| self, op: PhysicalOperator | ||
| ) -> Iterable[PhysicalOperator]: | ||
| """Get the downstream ineligible operators of the given operator. | ||
|
|
||
| E.g., | ||
| - "cur_map->downstream_map" will return an empty list. | ||
| - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2]. | ||
| """ | ||
| for next_op in op.output_dependencies: | ||
| if not self._is_op_eligible(next_op): | ||
| yield next_op | ||
| yield from self._get_downstream_ineligible_ops(next_op) | ||
|
|
||
| def _get_downstream_eligible_ops( | ||
| self, op: PhysicalOperator | ||
| ) -> Iterable[PhysicalOperator]: | ||
| """Get the downstream eligible operators of the given operator, ignoring | ||
| intermediate ineligible operators. | ||
|
|
||
| E.g., | ||
| - "cur_map->downstream_map" will return [downstream_map]. | ||
| - "cur_map->limit1->limit2->downstream_map" will return [downstream_map]. | ||
| """ | ||
| for next_op in op.output_dependencies: | ||
| if self._is_op_eligible(next_op): | ||
| yield next_op | ||
| else: | ||
| yield from self._get_downstream_eligible_ops(next_op) | ||
|
|
||
| def update_usages(self): | ||
| self._update_reservation() | ||
|
|
||
| self._op_budgets.clear() | ||
| eligible_ops = self._get_eligible_ops() | ||
| eligible_ops = self._resource_manager.get_eligible_ops() | ||
| if len(eligible_ops) == 0: | ||
| return | ||
|
|
||
|
|
@@ -699,10 +715,12 @@ def update_usages(self): | |
| op_mem_usage = 0 | ||
| # Add the memory usage of the operator itself, | ||
| # excluding `_reserved_for_op_outputs`. | ||
| op_mem_usage += self._resource_manager._mem_op_internal[op] | ||
| op_mem_usage += self._resource_manager.get_mem_op_internal(op) | ||
| # Add the portion of op outputs usage that has | ||
| # exceeded `_reserved_for_op_outputs`. | ||
| op_outputs_usage = self._get_op_outputs_usage_with_downstream(op) | ||
| op_outputs_usage = ( | ||
| self._resource_manager.get_op_outputs_usage_with_downstream(op) | ||
| ) | ||
| op_mem_usage += max(op_outputs_usage - self._reserved_for_op_outputs[op], 0) | ||
| op_usage = self._resource_manager.get_op_usage(op).copy( | ||
| object_store_memory=op_mem_usage | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.