Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def _derive_target_scaling_config(
op_state: "OpState",
) -> ActorPoolScalingRequest:
# If all inputs have been consumed, short-circuit
if op.completed() or (
if op.has_completed() or (
op._inputs_complete and op_state.total_enqueued_input_blocks() == 0
):
num_to_scale_down = self._compute_downscale_delta(actor_pool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ def has_execution_finished(self) -> bool:
and internal_input_queue_num_blocks == 0
)

def completed(self) -> bool:
def has_completed(self) -> bool:
"""Returns whether this operator has been fully completed.

An operator is completed iff:
Expand Down Expand Up @@ -583,15 +583,15 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
raise NotImplementedError

def input_done(self, input_index: int) -> None:
"""Called when the upstream operator at index `input_index` has completed().
"""Called when the upstream operator at index `input_index` has_completed().

After this is called, the executor guarantees that no more inputs will be added
via `add_input` for the given input index.
"""
pass

def all_inputs_done(self) -> None:
"""Called when all upstream operators have completed().
"""Called when all upstream operators has_completed().

After this is called, the executor guarantees that no more inputs will be added
via `add_input` for any input index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def num_output_rows_total(self) -> Optional[int]:
)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert input_index == 0, input_index
self._input_buffer.append(refs)
self._metrics.on_input_queued(refs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,9 +998,9 @@ def incremental_resource_usage(self) -> ExecutionResources:
gpu=0,
)

def completed(self) -> bool:
def has_completed(self) -> bool:
# TODO separate marking as completed from the check
return self._is_finalized() and super().completed()
return self._is_finalized() and super().has_completed()

def implements_accurate_memory_accounting(self) -> bool:
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _limit_reached(self) -> bool:
return self._consumed_rows >= self._limit

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert input_index == 0, input_index
if self._limit_reached():
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def num_active_tasks(self) -> int:
# metadata tasks, which are used by the actor-pool map operator to
# check if a newly created actor is ready.
# The reasons are because:
# 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator
# 1. `PhysicalOperator.has_completed` checks `num_active_tasks`. The operator
# should be considered completed if there are still pending actors.
# 2. The number of active tasks in the progress bar will be more accurate
# to reflect the actual data processing tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def clear_internal_output_queue(self) -> None:
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index

if not self._preserve_order:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def clear_internal_output_queue(self) -> None:
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
self._input_buffers[input_index].append(refs)
self._metrics.on_input_queued(refs)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:

def _get_first_pending_shuffle_op(topology: "Topology") -> int:
for idx, op in enumerate(topology):
if _is_shuffle_op(op) and not op.completed():
if _is_shuffle_op(op) and not op.has_completed():
return idx

return -1
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:

# Log metrics of newly completed operators.
for op, state in topology.items():
if op.completed() and not self._has_op_completed[op]:
if op.has_completed() and not self._has_op_completed[op]:
log_str = (
f"Operator {op} completed. "
f"Operator Metrics:\n{op._metrics.as_dict(skip_internal_metrics=True)}"
Expand All @@ -514,7 +514,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
self._validate_operator_queues_empty(op, state)

# Keep going until all operators run to completion.
return not all(op.completed() for op in topology)
return not all(op.has_completed() for op in topology)

def _refresh_progress_manager(self, topology: Topology):
# Update the progress manager to reflect scheduling decisions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def update_operator_states(topology: Topology) -> None:
continue
all_inputs_done = True
for idx, dep in enumerate(op.input_dependencies):
if dep.completed() and not topology[dep].output_queue:
if dep.has_completed() and not topology[dep].output_queue:
if not op_state.input_done_called[idx]:
op.input_done(idx)
op_state.input_done_called[idx] = True
Expand All @@ -548,7 +548,7 @@ def update_operator_states(topology: Topology) -> None:
for op, op_state in reversed(list(topology.items())):

dependents_completed = len(op.output_dependencies) > 0 and all(
dep.completed() for dep in op.output_dependencies
dep.has_completed() for dep in op.output_dependencies
)
if dependents_completed:
op.mark_execution_finished()
Expand Down Expand Up @@ -597,7 +597,11 @@ def get_eligible_operators(
# - It's not completed
# - It can accept at least one input
# - Its input queue has a valid bundle
if not op.completed() and op.should_add_input() and state.has_pending_bundles():
if (
not op.has_completed()
and op.should_add_input()
and state.has_pending_bundles()
):
if not in_backpressure:
op_runnable = True
eligible_ops.append(op)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ def test_completed_when_downstream_op_has_finished_execution(ray_start_regular_s

# ASSERT: Since the downstream operator has finished execution, the actor pool
# operator should consider itself completed.
assert actor_pool_map_op.completed()
assert actor_pool_map_op.has_completed()


def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context):
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_actor_pool_scaling():

op = MagicMock(
spec=InternalQueueOperatorMixin,
completed=MagicMock(return_value=False),
has_completed=MagicMock(return_value=False),
_inputs_complete=False,
input_dependencies=[MagicMock()],
internal_input_queue_num_blocks=MagicMock(return_value=1),
Expand Down Expand Up @@ -148,7 +148,7 @@ def assert_autoscaling_action(

# Should scale down since if the op is completed, or
# the op has no more inputs.
with patch(op, "completed", True):
with patch(op, "has_completed", True):
# NOTE: We simulate actor pool dipping below min size upon
# completion (to verify that it will be able to scale to 0)
with patch(actor_pool, "current_size", 5):
Expand Down Expand Up @@ -237,7 +237,7 @@ def autoscaler_max_upscaling_delta_setup():

op = MagicMock(
spec=InternalQueueOperatorMixin,
completed=MagicMock(return_value=False),
has_completed=MagicMock(return_value=False),
_inputs_complete=False,
metrics=MagicMock(average_num_inputs_per_task=1),
)
Expand Down
32 changes: 16 additions & 16 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def test_input_data_buffer(ray_start_regular_shared):
op = InputDataBuffer(DataContext.get_current(), inputs)

# Check we return all bundles in order.
assert not op.completed()
assert not op.has_completed()
assert _take_outputs(op) == [[1, 2], [3], [4, 5]]
assert op.completed()
assert op.has_completed()


def test_all_to_all_operator():
Expand Down Expand Up @@ -180,13 +180,13 @@ def dummy_all_transform(bundles: List[RefBundle], ctx):
op.all_inputs_done()

# Check we return transformed bundles.
assert not op.completed()
assert not op.has_completed()
outputs = _take_outputs(op)
expected = [[1, 2], [3, 4]]
assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}"
stats = op.get_stats()
assert "FooStats" in stats
assert op.completed()
assert op.has_completed()


def test_num_outputs_total():
Expand Down Expand Up @@ -271,7 +271,7 @@ def test_map_operator_streamed(ray_start_regular_shared, use_actors):
else:
assert "locality_hits" not in metrics, metrics
assert "locality_misses" not in metrics, metrics
assert not op.completed()
assert not op.has_completed()


@pytest.mark.parametrize("equal", [False, True])
Expand Down Expand Up @@ -574,7 +574,7 @@ def test_map_operator_actor_locality_stats(ray_start_regular_shared):
# Check e2e locality manager working.
assert metrics["locality_hits"] == 100, metrics
assert metrics["locality_misses"] == 0, metrics
assert not op.completed()
assert not op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -610,7 +610,7 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
run_op_tasks_sync(op)

_take_outputs(op)
assert op.completed()
assert op.has_completed()


def _run_map_operator_test(
Expand Down Expand Up @@ -661,7 +661,7 @@ def _run_map_operator_test(
while op.has_next():
outputs.append(op.get_next())
assert len(outputs) == expected_blocks
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -808,7 +808,7 @@ def test_map_operator_ray_args(shutdown_only, use_actors):
outputs = _take_outputs(op)
expected = [[i * 2] for i in range(10)]
assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}"
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -1006,7 +1006,7 @@ def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]:
while op.has_next():
op.get_next()
assert actor_pool.num_pending_actors() == num_actors
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1048,13 +1048,13 @@ def test_limit_operator(ray_start_regular_shared):
)
if limit == 0:
# If the limit is 0, the operator should be completed immediately.
assert limit_op.completed()
assert limit_op.has_completed()
assert limit_op._limit_reached()
cur_rows = 0
loop_count = 0
while input_op.has_next() and not limit_op._limit_reached():
loop_count += 1
assert not limit_op.completed(), limit
assert not limit_op.has_completed(), limit
assert not limit_op.has_execution_finished(), limit
limit_op.add_input(input_op.get_next(), 0)
while limit_op.has_next():
Expand All @@ -1064,19 +1064,19 @@ def test_limit_operator(ray_start_regular_shared):
cur_rows += num_rows_per_block
if cur_rows >= limit:
assert limit_op.mark_execution_finished.call_count == 1, limit
assert limit_op.completed(), limit
assert limit_op.has_completed(), limit
assert limit_op._limit_reached(), limit
assert limit_op.has_execution_finished(), limit
else:
assert limit_op.mark_execution_finished.call_count == 0, limit
assert not limit_op.completed(), limit
assert not limit_op.has_completed(), limit
assert not limit_op._limit_reached(), limit
assert not limit_op.has_execution_finished(), limit
limit_op.mark_execution_finished()
# After inputs done, the number of output bundles
# should be the same as the number of `add_input`s.
assert limit_op.num_outputs_total() == loop_count, limit
assert limit_op.completed(), limit
assert limit_op.has_completed(), limit


def test_limit_operator_memory_leak_fix(ray_start_regular_shared, tmp_path):
Expand Down Expand Up @@ -1524,7 +1524,7 @@ def map_fn(block_iter: Iterable[Block], ctx: TaskContext) -> Iterable[Block]:
run_op_tasks_sync(op)

_take_outputs(op)
assert op.completed()
assert op.has_completed()


def test_limit_estimated_num_output_bundles():
Expand Down
6 changes: 3 additions & 3 deletions python/ray/data/tests/test_streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def test_process_completed_tasks(sleep_task_ref, ray_start_regular_shared):
o2.get_active_tasks = MagicMock(return_value=[done_task])
o2.all_inputs_done = MagicMock()
o1.mark_execution_finished = MagicMock()
o1.completed = MagicMock(return_value=True)
o1.has_completed = MagicMock(return_value=True)
topo[o1].output_queue.clear()
process_completed_tasks(topo, [], 0)
update_operator_states(topo)
Expand Down Expand Up @@ -320,7 +320,7 @@ def _get_eligible_ops_to_run(ensure_liveness: bool):
assert _get_eligible_ops_to_run(ensure_liveness=False) == [o3]

# Completed ops are not eligible
with patch.object(o3, "completed") as _mock:
with patch.object(o3, "has_completed") as _mock:
_mock.return_value = True
assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2]

Expand All @@ -342,7 +342,7 @@ def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool):
assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3]

# Complete `o3`
with patch.object(o3, "completed") as _mock:
with patch.object(o3, "has_completed") as _mock:
_mock.return_value = True
# Clear up input queue
topo[o3].input_queues[0].clear()
Expand Down