Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def _derive_target_scaling_config(
op_state: "OpState",
) -> ActorPoolScalingRequest:
# If all inputs have been consumed, short-circuit
if op.completed() or (
if op.has_completed() or (
op._inputs_complete and op_state.total_enqueued_input_blocks() == 0
):
return ActorPoolScalingRequest.downscale(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ def has_execution_finished(self) -> bool:
and internal_input_queue_num_blocks == 0
)

def completed(self) -> bool:
def has_completed(self) -> bool:
"""Returns whether this operator has been fully completed.

An operator is completed iff:
Expand Down Expand Up @@ -584,15 +584,15 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
raise NotImplementedError

def input_done(self, input_index: int) -> None:
"""Called when the upstream operator at index `input_index` has completed().
"""Called when the upstream operator at index `input_index` has_completed().

After this is called, the executor guarantees that no more inputs will be added
via `add_input` for the given input index.
"""
pass

def all_inputs_done(self) -> None:
"""Called when all upstream operators have completed().
"""Called when all upstream operators has_completed().

After this is called, the executor guarantees that no more inputs will be added
via `add_input` for any input index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def num_output_rows_total(self) -> Optional[int]:
)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert input_index == 0, input_index
self._input_buffer.append(refs)
self._metrics.on_input_queued(refs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,9 @@ def incremental_resource_usage(self) -> ExecutionResources:
gpu=0,
)

def completed(self) -> bool:
def has_completed(self) -> bool:
# TODO separate marking as completed from the check
return self._is_finalized() and super().completed()
return self._is_finalized() and super().has_completed()

def implements_accurate_memory_accounting(self) -> bool:
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def _limit_reached(self) -> bool:
return self._consumed_rows >= self._limit

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert input_index == 0, input_index
if self._limit_reached():
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def clear_internal_output_queue(self) -> None:
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index

if not self._preserve_order:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def clear_internal_output_queue(self) -> None:
self._metrics.on_output_dequeued(bundle)

def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert not self.has_completed()
assert 0 <= input_index <= len(self._input_dependencies), input_index
self._input_buffers[input_index].append(refs)
self._metrics.on_input_queued(refs)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ def get_op_internal_object_store_usage(self, op: PhysicalOperator) -> int:

def _get_first_pending_shuffle_op(topology: "Topology") -> int:
for idx, op in enumerate(topology):
if _is_shuffle_op(op) and not op.completed():
if _is_shuffle_op(op) and not op.has_completed():
return idx

return -1
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:

# Log metrics of newly completed operators.
for op, state in topology.items():
if op.completed() and not self._has_op_completed[op]:
if op.has_completed() and not self._has_op_completed[op]:
log_str = (
f"Operator {op} completed. "
f"Operator Metrics:\n{op._metrics.as_dict(skip_internal_metrics=True)}"
Expand All @@ -533,7 +533,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
self._validate_operator_queues_empty(op, state)

# Keep going until all operators run to completion.
return not all(op.completed() for op in topology)
return not all(op.has_completed() for op in topology)

def _refresh_progress_bars(self, topology: Topology):
# Update the progress bar to reflect scheduling decisions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ def update_operator_states(topology: Topology) -> None:
continue
all_inputs_done = True
for idx, dep in enumerate(op.input_dependencies):
if dep.completed() and not topology[dep].output_queue:
if dep.has_completed() and not topology[dep].output_queue:
if not op_state.input_done_called[idx]:
op.input_done(idx)
op_state.input_done_called[idx] = True
Expand All @@ -664,7 +664,7 @@ def update_operator_states(topology: Topology) -> None:
for op, op_state in reversed(list(topology.items())):

dependents_completed = len(op.output_dependencies) > 0 and all(
dep.completed() for dep in op.output_dependencies
dep.has_completed() for dep in op.output_dependencies
)
if dependents_completed:
op.mark_execution_finished()
Expand Down Expand Up @@ -713,7 +713,11 @@ def get_eligible_operators(
# - It's not completed
# - It can accept at least one input
# - Its input queue has a valid bundle
if not op.completed() and op.should_add_input() and state.has_pending_bundles():
if (
not op.has_completed()
and op.should_add_input()
and state.has_pending_bundles()
):
if not in_backpressure:
op_runnable = True
eligible_ops.append(op)
Expand Down
2 changes: 1 addition & 1 deletion python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ def test_completed_when_downstream_op_has_finished_execution(ray_start_regular_s

# ASSERT: Since the downstream operator has finished execution, the actor pool
# operator should consider itself completed.
assert actor_pool_map_op.completed()
assert actor_pool_map_op.has_completed()


def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context):
Expand Down
32 changes: 16 additions & 16 deletions python/ray/data/tests/test_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ def test_input_data_buffer(ray_start_regular_shared):
op = InputDataBuffer(DataContext.get_current(), inputs)

# Check we return all bundles in order.
assert not op.completed()
assert not op.has_completed()
assert _take_outputs(op) == [[1, 2], [3], [4, 5]]
assert op.completed()
assert op.has_completed()


def test_all_to_all_operator():
Expand Down Expand Up @@ -180,13 +180,13 @@ def dummy_all_transform(bundles: List[RefBundle], ctx):
op.all_inputs_done()

# Check we return transformed bundles.
assert not op.completed()
assert not op.has_completed()
outputs = _take_outputs(op)
expected = [[1, 2], [3, 4]]
assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}"
stats = op.get_stats()
assert "FooStats" in stats
assert op.completed()
assert op.has_completed()


def test_num_outputs_total():
Expand Down Expand Up @@ -271,7 +271,7 @@ def test_map_operator_streamed(ray_start_regular_shared, use_actors):
else:
assert "locality_hits" not in metrics, metrics
assert "locality_misses" not in metrics, metrics
assert not op.completed()
assert not op.has_completed()


@pytest.mark.parametrize("equal", [False, True])
Expand Down Expand Up @@ -574,7 +574,7 @@ def test_map_operator_actor_locality_stats(ray_start_regular_shared):
# Check e2e locality manager working.
assert metrics["locality_hits"] == 100, metrics
assert metrics["locality_misses"] == 0, metrics
assert not op.completed()
assert not op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -610,7 +610,7 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]:
run_op_tasks_sync(op)

_take_outputs(op)
assert op.completed()
assert op.has_completed()


def _run_map_operator_test(
Expand Down Expand Up @@ -661,7 +661,7 @@ def _run_map_operator_test(
while op.has_next():
outputs.append(op.get_next())
assert len(outputs) == expected_blocks
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -808,7 +808,7 @@ def test_map_operator_ray_args(shutdown_only, use_actors):
outputs = _take_outputs(op)
expected = [[i * 2] for i in range(10)]
assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}"
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize("use_actors", [False, True])
Expand Down Expand Up @@ -1006,7 +1006,7 @@ def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]:
while op.has_next():
op.get_next()
assert actor_pool.num_pending_actors() == num_actors
assert op.completed()
assert op.has_completed()


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1048,13 +1048,13 @@ def test_limit_operator(ray_start_regular_shared):
)
if limit == 0:
# If the limit is 0, the operator should be completed immediately.
assert limit_op.completed()
assert limit_op.has_completed()
assert limit_op._limit_reached()
cur_rows = 0
loop_count = 0
while input_op.has_next() and not limit_op._limit_reached():
loop_count += 1
assert not limit_op.completed(), limit
assert not limit_op.has_completed(), limit
assert not limit_op.has_execution_finished(), limit
limit_op.add_input(input_op.get_next(), 0)
while limit_op.has_next():
Expand All @@ -1064,19 +1064,19 @@ def test_limit_operator(ray_start_regular_shared):
cur_rows += num_rows_per_block
if cur_rows >= limit:
assert limit_op.mark_execution_finished.call_count == 1, limit
assert limit_op.completed(), limit
assert limit_op.has_completed(), limit
assert limit_op._limit_reached(), limit
assert limit_op.has_execution_finished(), limit
else:
assert limit_op.mark_execution_finished.call_count == 0, limit
assert not limit_op.completed(), limit
assert not limit_op.has_completed(), limit
assert not limit_op._limit_reached(), limit
assert not limit_op.has_execution_finished(), limit
limit_op.mark_execution_finished()
# After inputs done, the number of output bundles
# should be the same as the number of `add_input`s.
assert limit_op.num_outputs_total() == loop_count, limit
assert limit_op.completed(), limit
assert limit_op.has_completed(), limit


def test_limit_operator_memory_leak_fix(ray_start_regular_shared, tmp_path):
Expand Down Expand Up @@ -1524,7 +1524,7 @@ def map_fn(block_iter: Iterable[Block], ctx: TaskContext) -> Iterable[Block]:
run_op_tasks_sync(op)

_take_outputs(op)
assert op.completed()
assert op.has_completed()


def test_limit_estimated_num_output_bundles():
Expand Down