diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index fef2281bcdc5..0eea3aa5d84b 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -156,11 +156,6 @@ python/ray/_private/runtime_env/plugin.py DOC107: Method `RuntimeEnvPlugin.create`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints DOC103: Method `RuntimeEnvPluginManager.create_uri_cache_for_plugin`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [plugin: RuntimeEnvPlugin]. Arguments in the docstring but not in the function signature: [plugin_name: ]. -------------------- -python/ray/_private/runtime_env/setup_hook.py - DOC102: Function `upload_worker_process_setup_hook_if_needed`: Docstring contains more arguments than in function signature. - DOC103: Function `upload_worker_process_setup_hook_if_needed`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the docstring but not in the function signature: [decoder: ]. - DOC201: Function `upload_worker_process_setup_hook_if_needed` does not have a return section in docstring --------------------- python/ray/_private/runtime_env/utils.py DOC103: Function `check_output_cmd`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: ]. Arguments in the docstring but not in the function signature: [kwargs: ]. -------------------- diff --git a/python/ray/_private/function_manager.py b/python/ray/_private/function_manager.py index 2dba86b484d6..8c4132f54cc1 100644 --- a/python/ray/_private/function_manager.py +++ b/python/ray/_private/function_manager.py @@ -56,6 +56,36 @@ def make_function_table_key(key_type: bytes, job_id: JobID, key: Optional[bytes] return b":".join([key_type, job_id.hex().encode(), key]) +def build_setup_hook_export_entry( + setup_func: Callable, job_id: JobID +) -> tuple[bytes, bytes, bytes]: + """Compute the exported payload and GCS key for a setup hook callable. + + Args: + setup_func: The setup hook function to export. + job_id: The job ID to export the setup hook for. + + Returns: + A tuple of (pickled_function, function_id, key). + """ + pickled_function = pickle_dumps( + setup_func, + "Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}", + ) + function_to_run_id = hashlib.shake_128(pickled_function).digest( + ray_constants.ID_SIZE + ) + key = make_function_table_key( + # This value should match with gcs_function_manager.h. + # Otherwise, it won't be GC'ed. + WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(), + # b"FunctionsToRun", + job_id, + function_to_run_id, + ) + return pickled_function, function_to_run_id, key + + class FunctionActorManager: """A class used to export/load remote functions and actors. Attributes: @@ -150,21 +180,8 @@ def export_setup_func( self, setup_func: Callable, timeout: Optional[int] = None ) -> bytes: """Export the setup hook function and return the key.""" - pickled_function = pickle_dumps( - setup_func, - "Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}", - ) - - function_to_run_id = hashlib.shake_128(pickled_function).digest( - ray_constants.ID_SIZE - ) - key = make_function_table_key( - # This value should match with gcs_function_manager.h. - # Otherwise, it won't be GC'ed. - WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(), - # b"FunctionsToRun", - self._worker.current_job_id.binary(), - function_to_run_id, + pickled_function, function_to_run_id, key = build_setup_hook_export_entry( + setup_func, self._worker.current_job_id.binary() ) check_oversized_function( diff --git a/python/ray/_private/runtime_env/setup_hook.py b/python/ray/_private/runtime_env/setup_hook.py index e6c1283df274..dae73105d78d 100644 --- a/python/ray/_private/runtime_env/setup_hook.py +++ b/python/ray/_private/runtime_env/setup_hook.py @@ -8,6 +8,7 @@ import ray._private.ray_constants as ray_constants import ray.cloudpickle as pickle from ray._common.utils import load_class +from ray._private.function_manager import build_setup_hook_export_entry from ray.runtime_env import RuntimeEnv logger = logging.getLogger(__name__) @@ -23,7 +24,7 @@ def get_import_export_timeout(): ) -def _decode_function_key(key: bytes) -> str: +def decode_function_key(key: bytes) -> str: # b64encode only includes A-Z, a-z, 0-9, + and / characters return RUNTIME_ENV_FUNC_IDENTIFIER + base64.b64encode(key).decode() @@ -33,6 +34,14 @@ def _encode_function_key(key: str) -> bytes: return base64.b64decode(key[len(RUNTIME_ENV_FUNC_IDENTIFIER) :]) +def _raise_setup_hook_conflict(existing_hook_value: str, setup_hook_desc: str) -> None: + raise RuntimeError( + "Conflicting worker_process_setup_hook: the setup hook env " + f"var is already set to '{existing_hook_value}', but " + f"runtime_env specifies {setup_hook_desc}." + ) + + def export_setup_func_callable( runtime_env: Union[Dict[str, Any], RuntimeEnv], setup_func: Callable, @@ -52,9 +61,7 @@ def export_setup_func_callable( f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, " "is not permitted because it is reserved for the internal use." ) - env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key( - key - ) + env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = decode_function_key(key) runtime_env["env_vars"] = env_vars # Note: This field is no-op. We don't have a plugin for the setup hook # because we can implement it simply using an env var. @@ -79,6 +86,60 @@ def export_setup_func_module( return runtime_env +def _check_setup_hook_consistency( + existing_hook_value: str, + setup_func: Union[Callable, str], + worker: "ray.Worker", +) -> None: + """Validate that an already-set hook env var is consistent with setup_func. + + When the env var is already populated (e.g. inherited from a job supervisor), + we compare it against the `worker_process_setup_hook` field in the runtime_env + to detect silent mismatches. + + Args: + existing_hook_value: The value of the existing hook env var. + setup_func: The setup function or module path. + worker: The worker instance. + + Raises: + RuntimeError: If a conflict between the existing env var and setup_func is detected. + """ + if isinstance(setup_func, Callable): + try: + _encode_function_key(existing_hook_value) + except Exception: + _raise_setup_hook_conflict( + existing_hook_value, f"callable '{setup_func.__name__}'" + ) + _check_callable_hooks_match(existing_hook_value, setup_func, worker) + elif isinstance(setup_func, str): + try: + _encode_function_key(existing_hook_value) + existing_is_callable_ref = True + except Exception: + existing_is_callable_ref = False + if existing_is_callable_ref or existing_hook_value != setup_func: + _raise_setup_hook_conflict(existing_hook_value, f"'{setup_func}'") + + +def _check_callable_hooks_match( + existing_hook_value: str, + setup_func: Callable, + worker: "ray.Worker", +) -> None: + """Verify a callable produces the same GCS key as the existing env var.""" + _, _, expected_key = build_setup_hook_export_entry( + setup_func, worker.current_job_id.binary() + ) + expected_env_value = decode_function_key(expected_key) + + if existing_hook_value != expected_env_value: + _raise_setup_hook_conflict( + existing_hook_value, f"callable '{setup_func.__name__}'" + ) + + def upload_worker_process_setup_hook_if_needed( runtime_env: Union[Dict[str, Any], RuntimeEnv], worker: "ray.Worker", @@ -94,17 +155,24 @@ def upload_worker_process_setup_hook_if_needed( runtime_env: The runtime_env. The value will be modified when returned. worker: ray.worker instance. - decoder: GCS requires the function key to be bytes. However, - we cannot json serialize (which is required to serialize - runtime env) the bytes. So the key should be decoded to - a string. The given decoder is used to decode the function - key. + + Returns: + The modified runtime_env with the setup hook processed into an env var. """ setup_func = runtime_env.get("worker_process_setup_hook") if setup_func is None: return runtime_env + env_vars = runtime_env.get("env_vars", {}) + existing_hook = env_vars.get(ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR) + + if existing_hook is not None: + # A setup hook is already populated (e.g. inherited from job supervisor). + # Validate that it is consistent with the current worker_process_setup_hook. + _check_setup_hook_consistency(existing_hook, setup_func, worker) + return runtime_env + if isinstance(setup_func, Callable): return export_setup_func_callable(runtime_env, setup_func, worker) elif isinstance(setup_func, str): diff --git a/python/ray/tests/test_runtime_env_setup_func.py b/python/ray/tests/test_runtime_env_setup_func.py index 218478ee11c8..885999b6aba0 100644 --- a/python/ray/tests/test_runtime_env_setup_func.py +++ b/python/ray/tests/test_runtime_env_setup_func.py @@ -4,11 +4,18 @@ import sys import tempfile import threading +from unittest.mock import MagicMock import pytest import ray +import ray._private.ray_constants as ray_constants from ray._common.test_utils import wait_for_condition +from ray._private.function_manager import build_setup_hook_export_entry +from ray._private.runtime_env.setup_hook import ( + decode_function_key, + upload_worker_process_setup_hook_if_needed, +) from ray._private.test_utils import format_web_url from ray.job_submission import JobStatus, JobSubmissionClient @@ -256,5 +263,86 @@ def verify(): os.rmdir(temp_dir) +def test_upload_hook_reprocessing_module_match(): + runtime_env = { + "worker_process_setup_hook": "my.module.hook", + "env_vars": { + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook", + }, + } + result = upload_worker_process_setup_hook_if_needed(runtime_env, worker=None) + assert ( + result["env_vars"][ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] + == "my.module.hook" + ) + + +def test_upload_hook_type_mismatch(): + """A callable setup_func vs a module-path env var is a type mismatch.""" + runtime_env = { + "worker_process_setup_hook": lambda: None, + "env_vars": { + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook", + }, + } + with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"): + upload_worker_process_setup_hook_if_needed(runtime_env, worker=None) + + +def test_upload_hook_str_callable_ref_mismatch(): + """A module-path setup_func vs a callable-ref env var is a type mismatch.""" + + def existing_hook(): + return None + + _, _, key = build_setup_hook_export_entry(existing_hook, b"\x01" * 4) + callable_ref = decode_function_key(key) + runtime_env = { + "worker_process_setup_hook": "totally.different.module", + "env_vars": { + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: callable_ref, + }, + } + with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"): + upload_worker_process_setup_hook_if_needed(runtime_env, worker=None) + + +def test_upload_hook_module_mismatch(): + """Conflicting module-path strings must be detected and rejected.""" + runtime_env = { + "worker_process_setup_hook": "my.module.hook_a", + "env_vars": { + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook_b", + }, + } + with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"): + upload_worker_process_setup_hook_if_needed(runtime_env, worker=None) + + +def test_upload_hook_callable_mismatch(): + """Two different callables referencing different GCS keys must be rejected.""" + mock_worker = MagicMock() + mock_worker.current_job_id.binary.return_value = b"\x00" * 4 + + def setup_a(): + return None + + def setup_b(): + return "different" + + _, _, key = build_setup_hook_export_entry( + setup_a, mock_worker.current_job_id.binary.return_value + ) + existing_callable_ref = decode_function_key(key) + runtime_env = { + "worker_process_setup_hook": setup_b, + "env_vars": { + ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: existing_callable_ref, + }, + } + with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"): + upload_worker_process_setup_hook_if_needed(runtime_env, worker=mock_worker) + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__]))