Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ python/ray/_private/runtime_env/plugin.py
DOC107: Method `RuntimeEnvPlugin.create`: The option `--arg-type-hints-in-signature` is `True` but not all args in the signature have type hints
DOC103: Method `RuntimeEnvPluginManager.create_uri_cache_for_plugin`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [plugin: RuntimeEnvPlugin]. Arguments in the docstring but not in the function signature: [plugin_name: ].
--------------------
python/ray/_private/runtime_env/setup_hook.py
DOC102: Function `upload_worker_process_setup_hook_if_needed`: Docstring contains more arguments than in function signature.
DOC103: Function `upload_worker_process_setup_hook_if_needed`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the docstring but not in the function signature: [decoder: ].
DOC201: Function `upload_worker_process_setup_hook_if_needed` does not have a return section in docstring
--------------------
python/ray/_private/runtime_env/utils.py
DOC103: Function `check_output_cmd`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: ]. Arguments in the docstring but not in the function signature: [kwargs: ].
--------------------
Expand Down
47 changes: 32 additions & 15 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,36 @@ def make_function_table_key(key_type: bytes, job_id: JobID, key: Optional[bytes]
return b":".join([key_type, job_id.hex().encode(), key])


def build_setup_hook_export_entry(
setup_func: Callable, job_id: JobID
) -> tuple[bytes, bytes, bytes]:
"""Compute the exported payload and GCS key for a setup hook callable.

Args:
setup_func: The setup hook function to export.
job_id: The job ID to export the setup hook for.

Returns:
A tuple of (pickled_function, function_id, key).
"""
pickled_function = pickle_dumps(
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)
function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE
)
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
job_id,
function_to_run_id,
)
return pickled_function, function_to_run_id, key


class FunctionActorManager:
"""A class used to export/load remote functions and actors.
Attributes:
Expand Down Expand Up @@ -150,21 +180,8 @@ def export_setup_func(
self, setup_func: Callable, timeout: Optional[int] = None
) -> bytes:
"""Export the setup hook function and return the key."""
pickled_function = pickle_dumps(
setup_func,
"Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}",
)

function_to_run_id = hashlib.shake_128(pickled_function).digest(
ray_constants.ID_SIZE
)
key = make_function_table_key(
# This value should match with gcs_function_manager.h.
# Otherwise, it won't be GC'ed.
WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(),
# b"FunctionsToRun",
self._worker.current_job_id.binary(),
function_to_run_id,
pickled_function, function_to_run_id, key = build_setup_hook_export_entry(
setup_func, self._worker.current_job_id.binary()
)

check_oversized_function(
Expand Down
86 changes: 77 additions & 9 deletions python/ray/_private/runtime_env/setup_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import ray._private.ray_constants as ray_constants
import ray.cloudpickle as pickle
from ray._common.utils import load_class
from ray._private.function_manager import build_setup_hook_export_entry
from ray.runtime_env import RuntimeEnv

logger = logging.getLogger(__name__)
Expand All @@ -23,7 +24,7 @@ def get_import_export_timeout():
)


def _decode_function_key(key: bytes) -> str:
def decode_function_key(key: bytes) -> str:
# b64encode only includes A-Z, a-z, 0-9, + and / characters
return RUNTIME_ENV_FUNC_IDENTIFIER + base64.b64encode(key).decode()

Expand All @@ -33,6 +34,14 @@ def _encode_function_key(key: str) -> bytes:
return base64.b64decode(key[len(RUNTIME_ENV_FUNC_IDENTIFIER) :])


def _raise_setup_hook_conflict(existing_hook_value: str, setup_hook_desc: str) -> None:
raise RuntimeError(
"Conflicting worker_process_setup_hook: the setup hook env "
f"var is already set to '{existing_hook_value}', but "
f"runtime_env specifies {setup_hook_desc}."
)


def export_setup_func_callable(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
setup_func: Callable,
Expand All @@ -52,9 +61,7 @@ def export_setup_func_callable(
f"The env var, {ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR}, "
"is not permitted because it is reserved for the internal use."
)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = _decode_function_key(
key
)
env_vars[ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR] = decode_function_key(key)
runtime_env["env_vars"] = env_vars
# Note: This field is no-op. We don't have a plugin for the setup hook
# because we can implement it simply using an env var.
Expand All @@ -79,6 +86,60 @@ def export_setup_func_module(
return runtime_env


def _check_setup_hook_consistency(
existing_hook_value: str,
setup_func: Union[Callable, str],
worker: "ray.Worker",
) -> None:
"""Validate that an already-set hook env var is consistent with setup_func.

When the env var is already populated (e.g. inherited from a job supervisor),
we compare it against the `worker_process_setup_hook` field in the runtime_env
to detect silent mismatches.

Args:
existing_hook_value: The value of the existing hook env var.
setup_func: The setup function or module path.
worker: The worker instance.

Raises:
RuntimeError: If a conflict between the existing env var and setup_func is detected.
"""
if isinstance(setup_func, Callable):
try:
_encode_function_key(existing_hook_value)
except Exception:
_raise_setup_hook_conflict(
existing_hook_value, f"callable '{setup_func.__name__}'"
)
_check_callable_hooks_match(existing_hook_value, setup_func, worker)
elif isinstance(setup_func, str):
try:
_encode_function_key(existing_hook_value)
existing_is_callable_ref = True
except Exception:
existing_is_callable_ref = False
if not existing_is_callable_ref and existing_hook_value != setup_func:
_raise_setup_hook_conflict(existing_hook_value, f"'{setup_func}'")
Comment thread
cursor[bot] marked this conversation as resolved.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callable hook re-entry raises false conflict on re-processing

Medium Severity

When a callable hook is first processed, export_setup_func_callable replaces runtime_env["worker_process_setup_hook"] with setup_func.__name__ (a plain string like "my_hook"), while setting the env var to a callable reference ("ray_runtime_env_func::..."). On re-entry, _check_setup_hook_consistency sees setup_func as a string and enters the elif isinstance(setup_func, str) branch. Since the existing env var is a callable reference, _encode_function_key succeeds, setting existing_is_callable_ref = True, which unconditionally triggers _raise_setup_hook_conflict. This makes legitimate callable hook re-entry impossible — the exact class of bug this PR aims to fix.

Additional Locations (1)

Fix in Cursor Fix in Web



def _check_callable_hooks_match(
existing_hook_value: str,
setup_func: Callable,
worker: "ray.Worker",
) -> None:
"""Verify a callable produces the same GCS key as the existing env var."""
_, _, expected_key = build_setup_hook_export_entry(
setup_func, worker.current_job_id.binary()
)
expected_env_value = decode_function_key(expected_key)

if existing_hook_value != expected_env_value:
_raise_setup_hook_conflict(
existing_hook_value, f"callable '{setup_func.__name__}'"
)


def upload_worker_process_setup_hook_if_needed(
runtime_env: Union[Dict[str, Any], RuntimeEnv],
worker: "ray.Worker",
Expand All @@ -94,17 +155,24 @@ def upload_worker_process_setup_hook_if_needed(
runtime_env: The runtime_env. The value will be modified
when returned.
worker: ray.worker instance.
decoder: GCS requires the function key to be bytes. However,
we cannot json serialize (which is required to serialize
runtime env) the bytes. So the key should be decoded to
a string. The given decoder is used to decode the function
key.

Returns:
The modified runtime_env with the setup hook processed into an env var.
"""
setup_func = runtime_env.get("worker_process_setup_hook")

if setup_func is None:
return runtime_env

env_vars = runtime_env.get("env_vars", {})
existing_hook = env_vars.get(ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR)

if existing_hook is not None:
# A setup hook is already populated (e.g. inherited from job supervisor).
# Validate that it is consistent with the current worker_process_setup_hook.
_check_setup_hook_consistency(existing_hook, setup_func, worker)
return runtime_env

if isinstance(setup_func, Callable):
return export_setup_func_callable(runtime_env, setup_func, worker)
elif isinstance(setup_func, str):
Expand Down
91 changes: 91 additions & 0 deletions python/ray/tests/test_runtime_env_setup_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
import sys
import tempfile
import threading
from unittest.mock import MagicMock

import pytest

import ray
import ray._private.ray_constants as ray_constants
from ray._common.test_utils import wait_for_condition
from ray._private.function_manager import build_setup_hook_export_entry
from ray._private.runtime_env.setup_hook import (
decode_function_key,
upload_worker_process_setup_hook_if_needed,
)
from ray._private.test_utils import format_web_url
from ray.job_submission import JobStatus, JobSubmissionClient

Expand Down Expand Up @@ -256,5 +263,89 @@ def verify():
os.rmdir(temp_dir)


def test_upload_hook_reprocessing_module_match():
runtime_env = {
"worker_process_setup_hook": "my.module.hook",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook",
},
}
result = upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)
assert (
result["env_vars"][ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR]
== "my.module.hook"
)


def test_upload_hook_reprocess_callable():
"""Reprocessing accepts an already-exported callable hook."""

def my_setup():
return None

_, _, key = build_setup_hook_export_entry(my_setup, b"\x01" * 4)
callable_ref = decode_function_key(key)
runtime_env = {
"worker_process_setup_hook": "my_setup",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: callable_ref,
},
}
result = upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)
assert (
result["env_vars"][ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR]
== callable_ref
)


def test_upload_hook_type_mismatch():
"""A callable setup_func vs a module-path env var is a type mismatch."""
runtime_env = {
"worker_process_setup_hook": lambda: None,
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook",
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)


def test_upload_hook_module_mismatch():
"""Conflicting module-path strings must be detected and rejected."""
runtime_env = {
"worker_process_setup_hook": "my.module.hook_a",
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: "my.module.hook_b",
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=None)


def test_upload_hook_callable_mismatch():
"""Two different callables referencing different GCS keys must be rejected."""
mock_worker = MagicMock()
mock_worker.current_job_id.binary.return_value = b"\x00" * 4

def setup_a():
return None

def setup_b():
return "different"

_, _, key = build_setup_hook_export_entry(
setup_a, mock_worker.current_job_id.binary.return_value
)
existing_callable_ref = decode_function_key(key)
runtime_env = {
"worker_process_setup_hook": setup_b,
"env_vars": {
ray_constants.WORKER_PROCESS_SETUP_HOOK_ENV_VAR: existing_callable_ref,
},
}
with pytest.raises(RuntimeError, match="Conflicting worker_process_setup_hook"):
upload_worker_process_setup_hook_if_needed(runtime_env, worker=mock_worker)


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))