diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index 0493d9c9cb7f..2bd1a5c9f794 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -35,6 +35,34 @@ def internal_output_queue_num_bytes(self) -> int: """Returns Operator's internal output queue size (in bytes)""" ... + @abc.abstractmethod + def clear_internal_input_queue(self) -> None: + """Clear internal input queue(s). + + This should drain all buffered input bundles and update metrics appropriately + by calling on_input_dequeued(). + """ + ... + + @abc.abstractmethod + def clear_internal_output_queue(self) -> None: + """Clear internal output queue(s). + + This should drain all buffered output bundles and update metrics appropriately + by calling on_output_dequeued(). + """ + ... + + def mark_execution_finished(self) -> None: + """Mark execution as finished and clear internal queues. + + This default implementation calls the parent's mark_execution_finished() + and then clears internal input and output queues. + """ + super().mark_execution_finished() + self.clear_internal_input_queue() + self.clear_internal_output_queue() + class OneToOneOperator(PhysicalOperator): """An operator that has one input and one output dependency. @@ -137,6 +165,18 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return sum(bundle.size_bytes() for bundle in self._output_buffer) + def clear_internal_input_queue(self) -> None: + """Clear internal input queue.""" + while self._input_buffer: + bundle = self._input_buffer.pop() + self._metrics.on_input_dequeued(bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_buffer: + bundle = self._output_buffer.pop() + self._metrics.on_output_dequeued(bundle) + def all_inputs_done(self) -> None: ctx = TaskContext( task_idx=self._next_task_index, diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 1c476131e296..969d03d58951 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -65,7 +65,7 @@ logger = logging.getLogger(__name__) -class MapOperator(OneToOneOperator, InternalQueueOperatorMixin, ABC): +class MapOperator(InternalQueueOperatorMixin, OneToOneOperator, ABC): """A streaming operator that maps input bundles 1:1 to output bundles. This operator implements the distributed map operation, supporting both task @@ -108,7 +108,7 @@ def __init__( self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle) # Queue for task outputs, either ordered or unordered (this is set by start()). - self._output_queue: _OutputQueue = None + self._output_queue: Optional[_OutputQueue] = None # Output metadata, added to on get_next(). self._output_blocks_stats: List[BlockStats] = [] # All active `DataOpTask`s. @@ -165,6 +165,19 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return self._output_queue.size_bytes() + def clear_internal_input_queue(self) -> None: + """Clear internal input queue (block ref bundler).""" + while self._block_ref_bundler.has_bundle(): + (input_bundles, _) = self._block_ref_bundler.get_next_bundle() + for input_bundle in input_bundles: + self._metrics.on_input_dequeued(input_bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_queue.has_next(): + bundle = self._output_queue.get_next() + self._metrics.on_output_dequeued(bundle) + @property def name(self) -> str: name = super().name diff --git a/python/ray/data/_internal/execution/operators/output_splitter.py b/python/ray/data/_internal/execution/operators/output_splitter.py index 3848bdb92b9f..628350b0c366 100644 --- a/python/ray/data/_internal/execution/operators/output_splitter.py +++ b/python/ray/data/_internal/execution/operators/output_splitter.py @@ -170,6 +170,18 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return sum(b.size_bytes() for b in self._output_queue) + def clear_internal_input_queue(self) -> None: + """Clear internal input queue.""" + while self._buffer: + bundle = self._buffer.pop() + self._metrics.on_input_dequeued(bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_queue: + bundle = self._output_queue.popleft() + self._metrics.on_output_dequeued(bundle) + def progress_str(self) -> str: if self._locality_hints: return locality_string(self._locality_hits, self._locality_misses) diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py index 8d10312102fc..f4850c2d32c6 100644 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ b/python/ray/data/_internal/execution/operators/union_operator.py @@ -85,6 +85,19 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return sum(q.size_bytes() for q in self._output_buffer) + def clear_internal_input_queue(self) -> None: + """Clear internal input queues.""" + for input_buffer in self._input_buffers: + while input_buffer: + bundle = input_buffer.get_next() + self._metrics.on_input_dequeued(bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_buffer: + bundle = self._output_buffer.popleft() + self._metrics.on_output_dequeued(bundle) + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert 0 <= input_index <= len(self._input_dependencies), input_index diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index dab398acf58f..a024aa47a7fb 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -93,6 +93,19 @@ def internal_output_queue_num_blocks(self) -> int: def internal_output_queue_num_bytes(self) -> int: return sum(bundle.size_bytes() for bundle in self._output_buffer) + def clear_internal_input_queue(self) -> None: + """Clear internal input queues.""" + for input_buffer in self._input_buffers: + while input_buffer: + bundle = input_buffer.popleft() + self._metrics.on_input_dequeued(bundle) + + def clear_internal_output_queue(self) -> None: + """Clear internal output queue.""" + while self._output_buffer: + bundle = self._output_buffer.popleft() + self._metrics.on_output_dequeued(bundle) + def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: assert not self.completed() assert 0 <= input_index <= len(self._input_dependencies), input_index diff --git a/python/ray/data/tests/test_resource_manager.py b/python/ray/data/tests/test_resource_manager.py index 5dad34912b1c..d63dec3b2f10 100644 --- a/python/ray/data/tests/test_resource_manager.py +++ b/python/ray/data/tests/test_resource_manager.py @@ -44,7 +44,7 @@ def mock_map_op( compute_strategy=compute_strategy, name=name, ) - op.start = MagicMock(side_effect=lambda _: None) + op.start(ExecutionOptions()) if incremental_resource_usage is not None: op.incremental_resource_usage = MagicMock( return_value=incremental_resource_usage