Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ python/ray/_private/runtime_env/plugin.py
DOC107: Method `RuntimeEnvPlugin.create`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC103: Method `RuntimeEnvPluginManager.create_uri_cache_for_plugin`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [plugin: RuntimeEnvPlugin]. Arguments in the docstring but not in the function signature: [plugin_name: ].
--------------------
python/ray/_private/runtime_env/setup_hook.py
DOC102: Function `upload_worker_process_setup_hook_if_needed`: Docstring contains more arguments than in function signature.
DOC103: Function `upload_worker_process_setup_hook_if_needed`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the docstring but not in the function signature: [decoder: ].
DOC201: Function `upload_worker_process_setup_hook_if_needed` does not have a return section in docstring
--------------------
python/ray/_private/runtime_env/utils.py
DOC103: Function `check_output_cmd`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: ]. Arguments in the docstring but not in the function signature: [kwargs: ].
--------------------
Expand Down
47 changes: 32 additions & 15 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ def make_function_table_key(key_type: bytes, job_id: JobID, key: Optional[bytes]
return b":".join([key_type, job_id.hex().encode(), key])


def build_setup_hook_export_entry(
setup_func: Callable, job_id: JobID
) -> tuple[bytes, bytes, bytes]:
"""Compute the exported payload and GCS key for a setup hook callable.

Args:
setup_func: The setup hook function to export.
job_id: The job ID to export the setup hook for.

Returns:
A tuple of (pickled_function, function_id, key).
"""
pickled_function = pickle_dumps(
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)
function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE
)
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
job_id,
function_to_run_id,
)
return pickled_function, function_to_run_id, key


class FunctionActorManager:
"""A class used to export/load remote functions and actors.
Attributes:
Expand Down Expand Up @@ -150,21 +180,8 @@ def export_setup_func(
self, setup_func: Callable, timeout: Optional[int] = None
) -> bytes:
"""Export the setup hook function and return the key."""
pickled_function = pickle_dumps(
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)

function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE
)
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
self._worker.current_job_id.binary(),
function_to_run_id,
pickled_function, function_to_run_id, key = build_setup_hook_export_entry(
setup_func, self._worker.current_job_id.binary()
)

check_oversized_function(
Expand Down
86 changes: 77 additions & 9 deletions python/ray/_private/runtime_env/setup_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ray._private.ray_constants as ray_constants
import ray.cloudpickle as pickle
from ray._common.utils import load_class
from ray._private.function_manager import build_setup_hook_export_entry
from ray.runtime_env import RuntimeEnv

logger = logging.getLogger(__name__)
Expand All @@ -23,7 +24,7 @@ def get_import_export_timeout():
)


def _decode_function_key(key: bytes) -> str:
def decode_function_key(key: bytes) -> str:
# b64encode only includes A-Z, a-z, 0-9, + and / characters
return RUNTIME_ENV_FUNC_IDENTIFIER + base64.b64encode(key).decode()

Expand All @@ -33,6 +34,14 @@ def _encode_function_key(key: str) -> bytes:
return base64.b64decode(key[len(RUNTIME_ENV_FUNC_IDENTIFIER) :])


def _raise_setup_hook_conflict(existing_hook_value: str, setup_hook_desc: str) -> None:
raise RuntimeError(
"Conflicting worker_process_setup_hook: the setup hook env "
f"var is already set to '{existing_hook_value}', but "
f"runtime_env specifies {setup_hook_desc}."
)


def export_setup_func_callable(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
setup_func: Callable,
Expand All @@ -52,9 +61,7 @@ def export_setup_func_callable(
f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, "
"is not permitted because it is reserved for the internal use."
)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key(
key
)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = decode_function_key(key)
runtime_env["env_vars"] = env_vars
# Note: This field is no-op. We don't have a plugin for the setup hook
# because we can implement it simply using an env var.
Expand All @@ -79,6 +86,60 @@ def export_setup_func_module(
return runtime_env


def _check_setup_hook_consistency(
existing_hook_value: str,
setup_func: Union[Callable, str],
worker: "ray.Worker",
) -> None:
"""Validate that an already-set hook env var is consistent with setup_func.

When the env var is already populated (e.g. inherited from a job supervisor),
we compare it against the `worker_process_setup_hook` field in the runtime_env
to detect silent mismatches.

Args:
existing_hook_value: The value of the existing hook env var.
setup_func: The setup function or module path.
worker: The worker instance.

Raises:
RuntimeError: If a conflict between the existing env var and setup_func is detected.
"""
if isinstance(setup_func, Callable):
try:
_encode_function_key(existing_hook_value)
except Exception:
_raise_setup_hook_conflict(
existing_hook_value, f"callable '{setup_func.__name__}'"
)
_check_callable_hooks_match(existing_hook_value, setup_func, worker)
elif isinstance(setup_func, str):
try:
_encode_function_key(existing_hook_value)
existing_is_callable_ref = True
except Exception:
existing_is_callable_ref = False
if existing_is_callable_ref or existing_hook_value != setup_func:
_raise_setup_hook_conflict(existing_hook_value, f"'{setup_func}'")
Comment thread
cursor[bot] marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callable hook re-entry raises false conflict on re-processing

Medium Severity

When a callable hook is first processed, export_setup_func_callable replaces runtime_env["worker_process_setup_hook"] with setup_func.__name__ (a plain string like "my_hook"), while setting the env var to a callable reference ("ray_runtime_env_func::..."). On re-entry, _check_setup_hook_consistency sees setup_func as a string and enters the elif isinstance(setup_func, str) branch. Since the existing env var is a callable reference, _encode_function_key succeeds, setting existing_is_callable_ref = True, which unconditionally triggers _raise_setup_hook_conflict. This makes legitimate callable hook re-entry impossible — the exact class of bug this PR aims to fix.

Additional Locations (1)

Fix in Cursor Fix in Web



def _check_callable_hooks_match(
existing_hook_value: str,
setup_func: Callable,
worker: "ray.Worker",
) -> None:
"""Verify a callable produces the same GCS key as the existing env var."""
_, _, expected_key = build_setup_hook_export_entry(
setup_func, worker.current_job_id.binary()
)
expected_env_value = decode_function_key(expected_key)

if existing_hook_value != expected_env_value:
_raise_setup_hook_conflict(
existing_hook_value, f"callable '{setup_func.__name__}'"
)


def upload_worker_process_setup_hook_if_needed(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
worker: "ray.Worker",
Expand All @@ -94,17 +155,24 @@ def upload_worker_process_setup_hook_if_needed(
runtime_env: The runtime_env. The value will be modified
when returned.
worker: ray.worker instance.
decoder: GCS requires the function key to be bytes. However,
we cannot json serialize (which is required to serialize
runtime env) the bytes. So the key should be decoded to
a string. The given decoder is used to decode the function
key.

Returns:
The modified runtime_env with the setup hook processed into an env var.
"""
setup_func = runtime_env.get("worker_process_setup_hook")

if setup_func is None:
return runtime_env

env_vars = runtime_env.get("env_vars", {})
existing_hook = env_vars.get(ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR)

if existing_hook is not None:
# A setup hook is already populated (e.g. inherited from job supervisor).
# Validate that it is consistent with the current worker_process_setup_hook.
_check_setup_hook_consistency(existing_hook, setup_func, worker)
return runtime_env

if isinstance(setup_func, Callable):
return export_setup_func_callable(runtime_env, setup_func, worker)
elif isinstance(setup_func, str):
Expand Down
88 changes: 88 additions & 0 deletions python/ray/tests/test_runtime_env_setup_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
import sys
import tempfile
import threading
from unittest.mock import MagicMock

import pytest

import ray
import ray._private.ray_constants as ray_constants
from ray._common.test_utils import wait_for_condition
from ray._private.function_manager import build_setup_hook_export_entry
from ray._private.runtime_env.setup_hook import (
decode_function_key,
upload_worker_process_setup_hook_if_needed,
)
from ray._private.test_utils import format_web_url
from ray.job_submission import JobStatus, JobSubmissionClient

Expand Down Expand Up @@ -256,5 +263,86 @@ def verify():
os.rmdir(temp_dir)


def test_upload_hook_reprocessing_module_match():
runtime_env = {
"worker_process_setup_hook": "my.module.hook",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook",
},
}
result = upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)
assert (
result["env_vars"][ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR]
== "my.module.hook"
)


def test_upload_hook_type_mismatch():
"""A callable setup_func vs a module-path env var is a type mismatch."""
runtime_env = {
"worker_process_setup_hook": lambda: None,
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook",
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)


def test_upload_hook_str_callable_ref_mismatch():
"""A module-path setup_func vs a callable-ref env var is a type mismatch."""

def existing_hook():
return None

_, _, key = build_setup_hook_export_entry(existing_hook, b"\x01" * 4)
callable_ref = decode_function_key(key)
runtime_env = {
"worker_process_setup_hook": "totally.different.module",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: callable_ref,
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)


def test_upload_hook_module_mismatch():
"""Conflicting module-path strings must be detected and rejected."""
runtime_env = {
"worker_process_setup_hook": "my.module.hook_a",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook_b",
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)


def test_upload_hook_callable_mismatch():
"""Two different callables referencing different GCS keys must be rejected."""
mock_worker = MagicMock()
mock_worker.current_job_id.binary.return_value = b"\x00" * 4

def setup_a():
return None

def setup_b():
return "different"

_, _, key = build_setup_hook_export_entry(
setup_a, mock_worker.current_job_id.binary.return_value
)
existing_callable_ref = decode_function_key(key)
runtime_env = {
"worker_process_setup_hook": setup_b,
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: existing_callable_ref,
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=mock_worker)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))