Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,34 @@ def internal_output_queue_num_bytes(self) -> int:
"""Returns Operator's internal output queue size (in bytes)"""
...

@abc.abstractmethod
def clear_internal_input_queue(self) -> None:
"""Clear internal input queue(s).

This should drain all buffered input bundles and update metrics appropriately
by calling on_input_dequeued().
"""
...

@abc.abstractmethod
def clear_internal_output_queue(self) -> None:
"""Clear internal output queue(s).

This should drain all buffered output bundles and update metrics appropriately
by calling on_output_dequeued().
"""
...

def mark_execution_finished(self) -> None:
"""Mark execution as finished and clear internal queues.

This default implementation calls the parent's mark_execution_finished()
and then clears internal input and output queues.
"""
super().mark_execution_finished()
self.clear_internal_input_queue()
self.clear_internal_output_queue()


class OneToOneOperator(PhysicalOperator):
"""An operator that has one input and one output dependency.
Expand Down Expand Up @@ -137,6 +165,18 @@ def internal_output_queue_num_blocks(self) -> int:
def internal_output_queue_num_bytes(self) -> int:
return sum(bundle.size_bytes() for bundle in self._output_buffer)

def clear_internal_input_queue(self) -> None:
"""Clear internal input queue."""
while self._input_buffer:
bundle = self._input_buffer.pop()
self._metrics.on_input_dequeued(bundle)

def clear_internal_output_queue(self) -> None:
"""Clear internal output queue."""
while self._output_buffer:
bundle = self._output_buffer.pop()
self._metrics.on_output_dequeued(bundle)

def all_inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
Expand Down
17 changes: 15 additions & 2 deletions python/ray/data/_internal/execution/operators/map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
logger = logging.getLogger(__name__)


class MapOperator(OneToOneOperator, InternalQueueOperatorMixin, ABC):
class MapOperator(InternalQueueOperatorMixin, OneToOneOperator, ABC):
"""A streaming operator that maps input bundles 1:1 to output bundles.

This operator implements the distributed map operation, supporting both task
Expand Down Expand Up @@ -108,7 +108,7 @@ def __init__(
self._block_ref_bundler = _BlockRefBundler(min_rows_per_bundle)

# Queue for task outputs, either ordered or unordered (this is set by start()).
self._output_queue: _OutputQueue = None
self._output_queue: Optional[_OutputQueue] = None
# Output metadata, added to on get_next().
self._output_blocks_stats: List[BlockStats] = []
# All active `DataOpTask`s.
Expand Down Expand Up @@ -165,6 +165,19 @@ def internal_output_queue_num_blocks(self) -> int:
def internal_output_queue_num_bytes(self) -> int:
return self._output_queue.size_bytes()

def clear_internal_input_queue(self) -> None:
"""Clear internal input queue (block ref bundler)."""
while self._block_ref_bundler.has_bundle():
(input_bundles, _) = self._block_ref_bundler.get_next_bundle()
for input_bundle in input_bundles:
self._metrics.on_input_dequeued(input_bundle)

def clear_internal_output_queue(self) -> None:
"""Clear internal output queue."""
while self._output_queue.has_next():
bundle = self._output_queue.get_next()
self._metrics.on_output_dequeued(bundle)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Output queue accessed before initialization.

The clear_internal_output_queue method accesses self._output_queue without checking if it's None. Since _output_queue is initialized to None in __init__ and only set in start(), calling mark_execution_finished() before start() (which happens in InputDataBuffer.__init__) causes an AttributeError when trying to call self._output_queue.has_next().

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't happen because we always call start() on an operator.


@property
def name(self) -> str:
name = super().name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ def internal_output_queue_num_blocks(self) -> int:
def internal_output_queue_num_bytes(self) -> int:
return sum(b.size_bytes() for b in self._output_queue)

def clear_internal_input_queue(self) -> None:
"""Clear internal input queue."""
while self._buffer:
bundle = self._buffer.pop()
self._metrics.on_input_dequeued(bundle)

def clear_internal_output_queue(self) -> None:
"""Clear internal output queue."""
while self._output_queue:
bundle = self._output_queue.popleft()
self._metrics.on_output_dequeued(bundle)

def progress_str(self) -> str:
if self._locality_hints:
return locality_string(self._locality_hits, self._locality_misses)
Expand Down
13 changes: 13 additions & 0 deletions python/ray/data/_internal/execution/operators/union_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ def internal_output_queue_num_blocks(self) -> int:
def internal_output_queue_num_bytes(self) -> int:
return sum(q.size_bytes() for q in self._output_buffer)

def clear_internal_input_queue(self) -> None:
"""Clear internal input queues."""
for input_buffer in self._input_buffers:
while input_buffer:
bundle = input_buffer.get_next()
self._metrics.on_input_dequeued(bundle)

def clear_internal_output_queue(self) -> None:
"""Clear internal output queue."""
while self._output_buffer:
bundle = self._output_buffer.popleft()
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
Expand Down
13 changes: 13 additions & 0 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ def internal_output_queue_num_blocks(self) -> int:
def internal_output_queue_num_bytes(self) -> int:
return sum(bundle.size_bytes() for bundle in self._output_buffer)

def clear_internal_input_queue(self) -> None:
"""Clear internal input queues."""
for input_buffer in self._input_buffers:
while input_buffer:
bundle = input_buffer.popleft()
self._metrics.on_input_dequeued(bundle)

def clear_internal_output_queue(self) -> None:
"""Clear internal output queue."""
while self._output_buffer:
bundle = self._output_buffer.popleft()
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def mock_map_op(
compute_strategy=compute_strategy,
name=name,
)
op.start = MagicMock(side_effect=lambda _: None)
op.start(ExecutionOptions())
if incremental_resource_usage is not None:
op.incremental_resource_usage = MagicMock(
return_value=incremental_resource_usage
Expand Down