diff --git a/python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py b/python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py index 3002d185e936..64505703a90e 100644 --- a/python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py +++ b/python/ray/data/_internal/actor_autoscaler/default_actor_autoscaler.py @@ -51,7 +51,7 @@ def _derive_target_scaling_config( op_state: "OpState", ) -> ActorPoolScalingRequest: # If all inputs have been consumed, short-circuit - if op.completed() or ( + if op.has_completed() or ( op._inputs_complete and op_state.total_enqueued_input_blocks() == 0 ): return ActorPoolScalingRequest.downscale( diff --git a/python/ray/data/_internal/execution/interfaces/physical_operator.py b/python/ray/data/_internal/execution/interfaces/physical_operator.py index 9d62485a1d11..bc2d3f289e53 100644 --- a/python/ray/data/_internal/execution/interfaces/physical_operator.py +++ b/python/ray/data/_internal/execution/interfaces/physical_operator.py @@ -427,7 +427,7 @@ def has_execution_finished(self) -> bool: and internal_input_queue_num_blocks == 0 ) - def completed(self) -> bool: + def has_completed(self) -> bool: """Returns whether this operator has been fully completed. An operator is completed iff: @@ -584,7 +584,7 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: raise NotImplementedError def input_done(self, input_index: int) -> None: - """Called when the upstream operator at index `input_index` has completed(). + """Called when the upstream operator at index `input_index` has_completed(). After this is called, the executor guarantees that no more inputs will be added via `add_input` for the given input index. @@ -592,7 +592,7 @@ def input_done(self, input_index: int) -> None: pass def all_inputs_done(self) -> None: - """Called when all upstream operators have completed(). + """Called when all upstream operators has_completed(). After this is called, the executor guarantees that no more inputs will be added via `add_input` for any input index. diff --git a/python/ray/data/_internal/execution/operators/base_physical_operator.py b/python/ray/data/_internal/execution/operators/base_physical_operator.py index 2bd1a5c9f794..ff1245b9e940 100644 --- a/python/ray/data/_internal/execution/operators/base_physical_operator.py +++ b/python/ray/data/_internal/execution/operators/base_physical_operator.py @@ -148,7 +148,7 @@ def num_output_rows_total(self) -> Optional[int]: ) def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: - assert not self.completed() + assert not self.has_completed() assert input_index == 0, input_index self._input_buffer.append(refs) self._metrics.on_input_queued(refs) diff --git a/python/ray/data/_internal/execution/operators/hash_shuffle.py b/python/ray/data/_internal/execution/operators/hash_shuffle.py index b7a2b649eb6a..fc334fc1c18c 100644 --- a/python/ray/data/_internal/execution/operators/hash_shuffle.py +++ b/python/ray/data/_internal/execution/operators/hash_shuffle.py @@ -1027,9 +1027,9 @@ def incremental_resource_usage(self) -> ExecutionResources: gpu=0, ) - def completed(self) -> bool: + def has_completed(self) -> bool: # TODO separate marking as completed from the check - return self._is_finalized() and super().completed() + return self._is_finalized() and super().has_completed() def implements_accurate_memory_accounting(self) -> bool: return True diff --git a/python/ray/data/_internal/execution/operators/limit_operator.py b/python/ray/data/_internal/execution/operators/limit_operator.py index 327a08e76031..0bb2dd800a9b 100644 --- a/python/ray/data/_internal/execution/operators/limit_operator.py +++ b/python/ray/data/_internal/execution/operators/limit_operator.py @@ -37,7 +37,7 @@ def _limit_reached(self) -> bool: return self._consumed_rows >= self._limit def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: - assert not self.completed() + assert not self.has_completed() assert input_index == 0, input_index if self._limit_reached(): return diff --git a/python/ray/data/_internal/execution/operators/map_operator.py b/python/ray/data/_internal/execution/operators/map_operator.py index 0441c1817f7f..06de1e87e7e6 100644 --- a/python/ray/data/_internal/execution/operators/map_operator.py +++ b/python/ray/data/_internal/execution/operators/map_operator.py @@ -712,7 +712,7 @@ def num_active_tasks(self) -> int: # metadata tasks, which are used by the actor-pool map operator to # check if a newly created actor is ready. # The reasons are because: - # 1. `PhysicalOperator.completed` checks `num_active_tasks`. The operator + # 1. `PhysicalOperator.has_completed` checks `num_active_tasks`. The operator # should be considered completed if there are still pending actors. # 2. The number of active tasks in the progress bar will be more accurate # to reflect the actual data processing tasks. diff --git a/python/ray/data/_internal/execution/operators/union_operator.py b/python/ray/data/_internal/execution/operators/union_operator.py index f4850c2d32c6..0933ef506e87 100644 --- a/python/ray/data/_internal/execution/operators/union_operator.py +++ b/python/ray/data/_internal/execution/operators/union_operator.py @@ -99,7 +99,7 @@ def clear_internal_output_queue(self) -> None: self._metrics.on_output_dequeued(bundle) def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: - assert not self.completed() + assert not self.has_completed() assert 0 <= input_index <= len(self._input_dependencies), input_index if not self._preserve_order: diff --git a/python/ray/data/_internal/execution/operators/zip_operator.py b/python/ray/data/_internal/execution/operators/zip_operator.py index a024aa47a7fb..b0b1676a0f92 100644 --- a/python/ray/data/_internal/execution/operators/zip_operator.py +++ b/python/ray/data/_internal/execution/operators/zip_operator.py @@ -107,7 +107,7 @@ def clear_internal_output_queue(self) -> None: self._metrics.on_output_dequeued(bundle) def _add_input_inner(self, refs: RefBundle, input_index: int) -> None: - assert not self.completed() + assert not self.has_completed() assert 0 <= input_index <= len(self._input_dependencies), input_index self._input_buffers[input_index].append(refs) self._metrics.on_input_queued(refs) diff --git a/python/ray/data/_internal/execution/resource_manager.py b/python/ray/data/_internal/execution/resource_manager.py index 1f6d387ab169..9a49c689c460 100644 --- a/python/ray/data/_internal/execution/resource_manager.py +++ b/python/ray/data/_internal/execution/resource_manager.py @@ -401,7 +401,7 @@ def get_op_internal_object_store_usage(self, op: PhysicalOperator) -> int: def _get_first_pending_shuffle_op(topology: "Topology") -> int: for idx, op in enumerate(topology): - if _is_shuffle_op(op) and not op.completed(): + if _is_shuffle_op(op) and not op.has_completed(): return idx return -1 diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index f506afe06986..8e7db6114e47 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -523,7 +523,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: # Log metrics of newly completed operators. for op, state in topology.items(): - if op.completed() and not self._has_op_completed[op]: + if op.has_completed() and not self._has_op_completed[op]: log_str = ( f"Operator {op} completed. " f"Operator Metrics:\n{op._metrics.as_dict(skip_internal_metrics=True)}" @@ -533,7 +533,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: self._validate_operator_queues_empty(op, state) # Keep going until all operators run to completion. - return not all(op.completed() for op in topology) + return not all(op.has_completed() for op in topology) def _refresh_progress_bars(self, topology: Topology): # Update the progress bar to reflect scheduling decisions. diff --git a/python/ray/data/_internal/execution/streaming_executor_state.py b/python/ray/data/_internal/execution/streaming_executor_state.py index a0362156e4c9..0c67a2a266d6 100644 --- a/python/ray/data/_internal/execution/streaming_executor_state.py +++ b/python/ray/data/_internal/execution/streaming_executor_state.py @@ -647,7 +647,7 @@ def update_operator_states(topology: Topology) -> None: continue all_inputs_done = True for idx, dep in enumerate(op.input_dependencies): - if dep.completed() and not topology[dep].output_queue: + if dep.has_completed() and not topology[dep].output_queue: if not op_state.input_done_called[idx]: op.input_done(idx) op_state.input_done_called[idx] = True @@ -664,7 +664,7 @@ def update_operator_states(topology: Topology) -> None: for op, op_state in reversed(list(topology.items())): dependents_completed = len(op.output_dependencies) > 0 and all( - dep.completed() for dep in op.output_dependencies + dep.has_completed() for dep in op.output_dependencies ) if dependents_completed: op.mark_execution_finished() @@ -713,7 +713,11 @@ def get_eligible_operators( # - It's not completed # - It can accept at least one input # - Its input queue has a valid bundle - if not op.completed() and op.should_add_input() and state.has_pending_bundles(): + if ( + not op.has_completed() + and op.should_add_input() + and state.has_pending_bundles() + ): if not in_backpressure: op_runnable = True eligible_ops.append(op) diff --git a/python/ray/data/tests/test_actor_pool_map_operator.py b/python/ray/data/tests/test_actor_pool_map_operator.py index 6444a407604e..3b4237f333bf 100644 --- a/python/ray/data/tests/test_actor_pool_map_operator.py +++ b/python/ray/data/tests/test_actor_pool_map_operator.py @@ -791,7 +791,7 @@ def test_completed_when_downstream_op_has_finished_execution(ray_start_regular_s # ASSERT: Since the downstream operator has finished execution, the actor pool # operator should consider itself completed. - assert actor_pool_map_op.completed() + assert actor_pool_map_op.has_completed() def test_actor_pool_fault_tolerance_e2e(ray_start_cluster, restore_data_context): diff --git a/python/ray/data/tests/test_autoscaler.py b/python/ray/data/tests/test_autoscaler.py index e31991ef99fd..28015079c74f 100644 --- a/python/ray/data/tests/test_autoscaler.py +++ b/python/ray/data/tests/test_autoscaler.py @@ -63,7 +63,7 @@ def test_actor_pool_scaling(): op = MagicMock( spec=InternalQueueOperatorMixin, - completed=MagicMock(return_value=False), + has_completed=MagicMock(return_value=False), _inputs_complete=False, input_dependencies=[MagicMock()], internal_queue_num_blocks=MagicMock(return_value=1), @@ -138,7 +138,7 @@ def assert_autoscaling_action( # Should scale down since if the op is completed, or # the op has no more inputs. - with patch(op, "completed", True): + with patch(op, "has_completed", True): # NOTE: We simulate actor pool dipping below min size upon # completion (to verify that it will be able to scale to 0) with patch(actor_pool, "current_size", 5): @@ -227,7 +227,7 @@ def autoscaler_max_upscaling_delta_setup(): op = MagicMock( spec=InternalQueueOperatorMixin, - completed=MagicMock(return_value=False), + has_completed=MagicMock(return_value=False), _inputs_complete=False, ) op_state = MagicMock( diff --git a/python/ray/data/tests/test_operators.py b/python/ray/data/tests/test_operators.py index f82de04a982a..d5d61cc0a326 100644 --- a/python/ray/data/tests/test_operators.py +++ b/python/ray/data/tests/test_operators.py @@ -138,9 +138,9 @@ def test_input_data_buffer(ray_start_regular_shared): op = InputDataBuffer(DataContext.get_current(), inputs) # Check we return all bundles in order. - assert not op.completed() + assert not op.has_completed() assert _take_outputs(op) == [[1, 2], [3], [4, 5]] - assert op.completed() + assert op.has_completed() def test_all_to_all_operator(): @@ -180,13 +180,13 @@ def dummy_all_transform(bundles: List[RefBundle], ctx): op.all_inputs_done() # Check we return transformed bundles. - assert not op.completed() + assert not op.has_completed() outputs = _take_outputs(op) expected = [[1, 2], [3, 4]] assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}" stats = op.get_stats() assert "FooStats" in stats - assert op.completed() + assert op.has_completed() def test_num_outputs_total(): @@ -271,7 +271,7 @@ def test_map_operator_streamed(ray_start_regular_shared, use_actors): else: assert "locality_hits" not in metrics, metrics assert "locality_misses" not in metrics, metrics - assert not op.completed() + assert not op.has_completed() @pytest.mark.parametrize("equal", [False, True]) @@ -574,7 +574,7 @@ def test_map_operator_actor_locality_stats(ray_start_regular_shared): # Check e2e locality manager working. assert metrics["locality_hits"] == 100, metrics assert metrics["locality_misses"] == 0, metrics - assert not op.completed() + assert not op.has_completed() @pytest.mark.parametrize("use_actors", [False, True]) @@ -610,7 +610,7 @@ def _check_batch(block_iter: Iterable[Block], ctx) -> Iterable[Block]: run_op_tasks_sync(op) _take_outputs(op) - assert op.completed() + assert op.has_completed() def _run_map_operator_test( @@ -661,7 +661,7 @@ def _run_map_operator_test( while op.has_next(): outputs.append(op.get_next()) assert len(outputs) == expected_blocks - assert op.completed() + assert op.has_completed() @pytest.mark.parametrize("use_actors", [False, True]) @@ -808,7 +808,7 @@ def test_map_operator_ray_args(shutdown_only, use_actors): outputs = _take_outputs(op) expected = [[i * 2] for i in range(10)] assert sorted(outputs) == expected, f"Expected {expected}, got {outputs}" - assert op.completed() + assert op.has_completed() @pytest.mark.parametrize("use_actors", [False, True]) @@ -1006,7 +1006,7 @@ def _map_transfom_fn(block_iter: Iterable[Block], _) -> Iterable[Block]: while op.has_next(): op.get_next() assert actor_pool.num_pending_actors() == num_actors - assert op.completed() + assert op.has_completed() @pytest.mark.parametrize( @@ -1048,13 +1048,13 @@ def test_limit_operator(ray_start_regular_shared): ) if limit == 0: # If the limit is 0, the operator should be completed immediately. - assert limit_op.completed() + assert limit_op.has_completed() assert limit_op._limit_reached() cur_rows = 0 loop_count = 0 while input_op.has_next() and not limit_op._limit_reached(): loop_count += 1 - assert not limit_op.completed(), limit + assert not limit_op.has_completed(), limit assert not limit_op.has_execution_finished(), limit limit_op.add_input(input_op.get_next(), 0) while limit_op.has_next(): @@ -1064,19 +1064,19 @@ def test_limit_operator(ray_start_regular_shared): cur_rows += num_rows_per_block if cur_rows >= limit: assert limit_op.mark_execution_finished.call_count == 1, limit - assert limit_op.completed(), limit + assert limit_op.has_completed(), limit assert limit_op._limit_reached(), limit assert limit_op.has_execution_finished(), limit else: assert limit_op.mark_execution_finished.call_count == 0, limit - assert not limit_op.completed(), limit + assert not limit_op.has_completed(), limit assert not limit_op._limit_reached(), limit assert not limit_op.has_execution_finished(), limit limit_op.mark_execution_finished() # After inputs done, the number of output bundles # should be the same as the number of `add_input`s. assert limit_op.num_outputs_total() == loop_count, limit - assert limit_op.completed(), limit + assert limit_op.has_completed(), limit def test_limit_operator_memory_leak_fix(ray_start_regular_shared, tmp_path): @@ -1524,7 +1524,7 @@ def map_fn(block_iter: Iterable[Block], ctx: TaskContext) -> Iterable[Block]: run_op_tasks_sync(op) _take_outputs(op) - assert op.completed() + assert op.has_completed() def test_limit_estimated_num_output_bundles(): diff --git a/python/ray/data/tests/test_streaming_executor.py b/python/ray/data/tests/test_streaming_executor.py index 870d1652402a..64ddd00f6693 100644 --- a/python/ray/data/tests/test_streaming_executor.py +++ b/python/ray/data/tests/test_streaming_executor.py @@ -197,7 +197,7 @@ def test_process_completed_tasks(sleep_task_ref, ray_start_regular_shared): o2.get_active_tasks = MagicMock(return_value=[done_task]) o2.all_inputs_done = MagicMock() o1.mark_execution_finished = MagicMock() - o1.completed = MagicMock(return_value=True) + o1.has_completed = MagicMock(return_value=True) topo[o1].output_queue.clear() process_completed_tasks(topo, [], 0) update_operator_states(topo) @@ -320,7 +320,7 @@ def _get_eligible_ops_to_run(ensure_liveness: bool): assert _get_eligible_ops_to_run(ensure_liveness=False) == [o3] # Completed ops are not eligible - with patch.object(o3, "completed") as _mock: + with patch.object(o3, "has_completed") as _mock: _mock.return_value = True assert _get_eligible_ops_to_run(ensure_liveness=False) == [o2] @@ -342,7 +342,7 @@ def _get_eligible_ops_to_run_with_policy(ensure_liveness: bool): assert _get_eligible_ops_to_run_with_policy(ensure_liveness=False) == [o3] # Complete `o3` - with patch.object(o3, "completed") as _mock: + with patch.object(o3, "has_completed") as _mock: _mock.return_value = True # Clear up input queue topo[o3].input_queues[0].clear()