Skip to content

Commit e0d8c0d

Browse files
iamjustinhsuFuture-Outlier
authored andcommitted
[data] Clear queue for manually mark_execution_finished operators (ray-project#58441)
## Description Currently, we clear _external_ queues when an operator is manually marked as finished. But we don't clear their _internal_ queues. This PR fixes that ## Related issues Fixes this test https://buildkite.com/ray-project/postmerge/builds/14223#019a5791-3d46-4ab8-9f97-e03ea1c04bb0/642-736 ## Additional information --------- Signed-off-by: iamjustinhsu <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
1 parent af024c2 commit e0d8c0d

File tree

6 files changed

+94
-3
lines changed

6 files changed

+94
-3
lines changed

python/ray/data/_internal/execution/operators/base_physical_operator.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,34 @@ def internal_output_queue_num_bytes(self) -> int:
3535
"""Returns Operator's internal output queue size (in bytes)"""
3636
...
3737

38+
@abc.abstractmethod
39+
def clear_internal_input_queue(self) -> None:
40+
"""Clear internal input queue(s).
41+
42+
This should drain all buffered input bundles and update metrics appropriately
43+
by calling on_input_dequeued().
44+
"""
45+
...
46+
47+
@abc.abstractmethod
48+
def clear_internal_output_queue(self) -> None:
49+
"""Clear internal output queue(s).
50+
51+
This should drain all buffered output bundles and update metrics appropriately
52+
by calling on_output_dequeued().
53+
"""
54+
...
55+
56+
def mark_execution_finished(self) -> None:
57+
"""Mark execution as finished and clear internal queues.
58+
59+
This default implementation calls the parent's mark_execution_finished()
60+
and then clears internal input and output queues.
61+
"""
62+
super().mark_execution_finished()
63+
self.clear_internal_input_queue()
64+
self.clear_internal_output_queue()
65+
3866

3967
class OneToOneOperator(PhysicalOperator):
4068
"""An operator that has one input and one output dependency.
@@ -137,6 +165,18 @@ def internal_output_queue_num_blocks(self) -> int:
137165
def internal_output_queue_num_bytes(self) -> int:
138166
return sum(bundle.size_bytes() for bundle in self._output_buffer)
139167

168+
def clear_internal_input_queue(self) -> None:
169+
"""Clear internal input queue."""
170+
while self._input_buffer:
171+
bundle = self._input_buffer.pop()
172+
self._metrics.on_input_dequeued(bundle)
173+
174+
def clear_internal_output_queue(self) -> None:
175+
"""Clear internal output queue."""
176+
while self._output_buffer:
177+
bundle = self._output_buffer.pop()
178+
self._metrics.on_output_dequeued(bundle)
179+
140180
def all_inputs_done(self) -> None:
141181
ctx = TaskContext(
142182
task_idx=self._next_task_index,

python/ray/data/_internal/execution/operators/map_operator.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
logger = logging.getLogger(__name__)
6666

6767

68-
class MapOperator(OneToOneOperator, InternalQueueOperatorMixin, ABC):
68+
class MapOperator(InternalQueueOperatorMixin, OneToOneOperator, ABC):
6969
"""A streaming operator that maps input bundles 1:1 to output bundles.
7070
7171
This operator implements the distributed map operation, supporting both task
@@ -108,7 +108,7 @@ def __init__(
108108
self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle)
109109

110110
# Queue for task outputs, either ordered or unordered (this is set by start()).
111-
self._output_queue: _OutputQueue = None
111+
self._output_queue: Optional[_OutputQueue] = None
112112
# Output metadata, added to on get_next().
113113
self._output_blocks_stats: List[BlockStats] = []
114114
# All active `DataOpTask`s.
@@ -165,6 +165,19 @@ def internal_output_queue_num_blocks(self) -> int:
165165
def internal_output_queue_num_bytes(self) -> int:
166166
return self._output_queue.size_bytes()
167167

168+
def clear_internal_input_queue(self) -> None:
169+
"""Clear internal input queue (block ref bundler)."""
170+
while self._block_ref_bundler.has_bundle():
171+
(input_bundles, _) = self._block_ref_bundler.get_next_bundle()
172+
for input_bundle in input_bundles:
173+
self._metrics.on_input_dequeued(input_bundle)
174+
175+
def clear_internal_output_queue(self) -> None:
176+
"""Clear internal output queue."""
177+
while self._output_queue.has_next():
178+
bundle = self._output_queue.get_next()
179+
self._metrics.on_output_dequeued(bundle)
180+
168181
@property
169182
def name(self) -> str:
170183
name = super().name

python/ray/data/_internal/execution/operators/output_splitter.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ def internal_output_queue_num_blocks(self) -> int:
170170
def internal_output_queue_num_bytes(self) -> int:
171171
return sum(b.size_bytes() for b in self._output_queue)
172172

173+
def clear_internal_input_queue(self) -> None:
174+
"""Clear internal input queue."""
175+
while self._buffer:
176+
bundle = self._buffer.pop()
177+
self._metrics.on_input_dequeued(bundle)
178+
179+
def clear_internal_output_queue(self) -> None:
180+
"""Clear internal output queue."""
181+
while self._output_queue:
182+
bundle = self._output_queue.popleft()
183+
self._metrics.on_output_dequeued(bundle)
184+
173185
def progress_str(self) -> str:
174186
if self._locality_hints:
175187
return locality_string(self._locality_hits, self._locality_misses)

python/ray/data/_internal/execution/operators/union_operator.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,19 @@ def internal_output_queue_num_blocks(self) -> int:
8585
def internal_output_queue_num_bytes(self) -> int:
8686
return sum(q.size_bytes() for q in self._output_buffer)
8787

88+
def clear_internal_input_queue(self) -> None:
89+
"""Clear internal input queues."""
90+
for input_buffer in self._input_buffers:
91+
while input_buffer:
92+
bundle = input_buffer.get_next()
93+
self._metrics.on_input_dequeued(bundle)
94+
95+
def clear_internal_output_queue(self) -> None:
96+
"""Clear internal output queue."""
97+
while self._output_buffer:
98+
bundle = self._output_buffer.popleft()
99+
self._metrics.on_output_dequeued(bundle)
100+
88101
def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
89102
assert not self.completed()
90103
assert 0 <= input_index <= len(self._input_dependencies), input_index

python/ray/data/_internal/execution/operators/zip_operator.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,19 @@ def internal_output_queue_num_blocks(self) -> int:
9393
def internal_output_queue_num_bytes(self) -> int:
9494
return sum(bundle.size_bytes() for bundle in self._output_buffer)
9595

96+
def clear_internal_input_queue(self) -> None:
97+
"""Clear internal input queues."""
98+
for input_buffer in self._input_buffers:
99+
while input_buffer:
100+
bundle = input_buffer.popleft()
101+
self._metrics.on_input_dequeued(bundle)
102+
103+
def clear_internal_output_queue(self) -> None:
104+
"""Clear internal output queue."""
105+
while self._output_buffer:
106+
bundle = self._output_buffer.popleft()
107+
self._metrics.on_output_dequeued(bundle)
108+
96109
def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
97110
assert not self.completed()
98111
assert 0 <= input_index <= len(self._input_dependencies), input_index

python/ray/data/tests/test_resource_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def mock_map_op(
4444
compute_strategy=compute_strategy,
4545
name=name,
4646
)
47-
op.start = MagicMock(side_effect=lambda _: None)
47+
op.start(ExecutionOptions())
4848
if incremental_resource_usage is not None:
4949
op.incremental_resource_usage = MagicMock(
5050
return_value=incremental_resource_usage

0 commit comments

Comments
 (0)