Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e97f7ba
import the custom classes in controller build step
abrarsheikh Oct 3, 2025
12bb32c
fix
abrarsheikh Oct 3, 2025
7983f6b
fix api
abrarsheikh Oct 3, 2025
f00f616
check for type
abrarsheikh Oct 3, 2025
61aa96d
Merge branch 'master' into SERVE-1134-abrar-request_router
abrarsheikh Oct 15, 2025
069cfbc
refactor
abrarsheikh Oct 15, 2025
391b214
Merge branch 'master' into SERVE-1134-abrar-request_router
abrarsheikh Oct 16, 2025
20e4162
fix tests
abrarsheikh Oct 16, 2025
a704d62
nits
abrarsheikh Oct 16, 2025
66c42b1
force cloud pickle to serialize by value
abrarsheikh Oct 17, 2025
3746185
add back None check
abrarsheikh Oct 17, 2025
c0819e9
remove policy check
abrarsheikh Oct 17, 2025
4a0a295
Merge branch 'master' of github.com:ray-project/ray into SERVE-1134-a…
abrarsheikh Oct 17, 2025
d608e7d
fix test
abrarsheikh Oct 17, 2025
af0972f
use name
abrarsheikh Oct 17, 2025
e24d8a0
Merge branch 'master' of github.com:ray-project/ray into SERVE-1134-a…
abrarsheikh Oct 17, 2025
8e036ad
Merge branch 'master' of github.com:ray-project/ray into SERVE-1134-a…
abrarsheikh Oct 17, 2025
267cf96
serialize
abrarsheikh Oct 17, 2025
48ba0e5
add newline
abrarsheikh Oct 18, 2025
27b6577
remove args
abrarsheikh Oct 18, 2025
4ca7534
type check
abrarsheikh Oct 19, 2025
cce78a1
Merge branch 'master' of github.com:ray-project/ray into SERVE-1134-a…
abrarsheikh Oct 21, 2025
e8680c4
remove docstrings
abrarsheikh Oct 21, 2025
618178f
fix import
abrarsheikh Oct 21, 2025
c14c981
xMerge branch 'master' into SERVE-1134-abrar-request_router
abrarsheikh Oct 21, 2025
06846ff
denormalize
abrarsheikh Oct 21, 2025
c46e347
add back docs
abrarsheikh Oct 22, 2025
b792a11
Merge branch 'master' of github.com:ray-project/ray into SERVE-1134-a…
abrarsheikh Oct 22, 2025
13ae9b1
fix
abrarsheikh Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 89 additions & 3 deletions python/ray/serve/_private/application_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
TargetCapacityDirection,
)
from ray.serve._private.config import DeploymentConfig
from ray.serve._private.constants import RAY_SERVE_ENABLE_TASK_EVENTS, SERVE_LOGGER_NAME
from ray.serve._private.constants import (
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_REQUEST_ROUTER_PATH,
RAY_SERVE_ENABLE_TASK_EVENTS,
SERVE_LOGGER_NAME,
)
from ray.serve._private.deploy_utils import (
deploy_args_to_deployment_info,
get_app_code_version,
Expand All @@ -42,7 +47,7 @@
validate_route_prefix,
)
from ray.serve.api import ASGIAppReplicaWrapper
from ray.serve.config import AutoscalingConfig
from ray.serve.config import AutoscalingConfig, RequestRouterConfig
from ray.serve.exceptions import RayServeException
from ray.serve.generated.serve_pb2 import (
ApplicationStatus as ApplicationStatusProto,
Expand Down Expand Up @@ -570,6 +575,26 @@ def apply_app_config(
) or self._target_state.config.runtime_env.get("image_uri"):
ServeUsageTag.APP_CONTAINER_RUNTIME_ENV_USED.record("1")

deployment_to_autoscaling_policy_name = {}
for deployment in config.deployments:
# Since we are using configs to extract the autoscaling policy name, it is guaranteed that the the type of policy name is a string
if isinstance(deployment.autoscaling_config, dict):
deployment_to_autoscaling_policy_name[
deployment.name
] = deployment.autoscaling_config.get("policy", {}).get(
"name", DEFAULT_AUTOSCALING_POLICY_NAME
)

deployment_to_request_router_cls = {}
for deployment in config.deployments:
# Since we are using configs to extract the request router cls, it is guaranteed that the the type of request router cls is a string
if isinstance(deployment.request_router_config, dict):
deployment_to_request_router_cls[
deployment.name
] = deployment.request_router_config.get(
"request_router_class", DEFAULT_REQUEST_ROUTER_PATH
)

# Kick off new build app task
logger.info(f"Importing and building app '{self._name}'.")
build_app_obj_ref = build_serve_application.options(
Expand All @@ -581,6 +606,8 @@ def apply_app_config(
config.name,
config.args,
self._logging_config,
deployment_to_autoscaling_policy_name,
deployment_to_request_router_cls,
)
self._build_app_task_info = BuildAppTaskInfo(
obj_ref=build_app_obj_ref,
Expand Down Expand Up @@ -707,8 +734,21 @@ def _reconcile_build_app_task(self) -> Tuple[Optional[Dict], BuildAppStatus, str
)
for params in args
}
deployment_to_serialized_autoscaling_policy_def = {
params["deployment_name"]: params["serialized_autoscaling_policy_def"]
for params in args
if params["serialized_autoscaling_policy_def"] is not None
}
deployment_to_serialized_request_router_cls = {
params["deployment_name"]: params["serialized_request_router_cls"]
for params in args
if params["serialized_request_router_cls"] is not None
}
overrided_infos = override_deployment_info(
deployment_infos, self._build_app_task_info.config
deployment_infos,
self._build_app_task_info.config,
deployment_to_serialized_autoscaling_policy_def,
deployment_to_serialized_request_router_cls,
)
self._route_prefix = self._check_routes(overrided_infos)
return overrided_infos, BuildAppStatus.SUCCEEDED, ""
Expand Down Expand Up @@ -1180,6 +1220,8 @@ def build_serve_application(
name: str,
args: Dict,
logging_config: LoggingConfig,
deployment_to_autoscaling_policy_name: Dict[str, str],
deployment_to_request_router_cls: Dict[str, str],
) -> Tuple[Optional[List[Dict]], Optional[str]]:
"""Import and build a Serve application.

Expand All @@ -1191,6 +1233,8 @@ def build_serve_application(
without removing existing applications.
args: Arguments to be passed to the application builder.
logging_config: the logging config for the build app task.
deployment_to_autoscaling_policy_name: a dictionary mapping deployment names to autoscaling policy names
deployment_to_request_router_cls: a dictionary mapping deployment names to request router classe names
Returns:
Deploy arguments: a list of deployment arguments if application
was built successfully, otherwise None.
Expand Down Expand Up @@ -1226,6 +1270,16 @@ def build_serve_application(
):
num_ingress_deployments += 1
is_ingress = deployment.name == built_app.ingress_deployment_name
deployment_to_serialized_autoscaling_policy_def = None
deployment_to_serialized_request_router_cls = None
if deployment.name in deployment_to_autoscaling_policy_name:
deployment_to_serialized_autoscaling_policy_def = cloudpickle.dumps(
import_attr(deployment_to_autoscaling_policy_name[deployment.name])
)
if deployment.name in deployment_to_request_router_cls:
deployment_to_serialized_request_router_cls = cloudpickle.dumps(
import_attr(deployment_to_request_router_cls[deployment.name])
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Autoscaling Policy Serialization Issue

The build_serve_application function serializes custom autoscaling policies and request router classes by reference instead of by value. This happens because it uses cloudpickle.dumps with import_attr but doesn't register the modules with cloudpickle.register_pickle_by_value, which can lead to ModuleNotFoundError when these classes are deserialized in different runtime environments.

Fix in Cursor Fix in Web

deploy_args_list.append(
get_deploy_args(
name=deployment._name,
Expand All @@ -1234,6 +1288,8 @@ def build_serve_application(
deployment_config=deployment._deployment_config,
version=code_version,
route_prefix="/" if is_ingress else None,
serialized_autoscaling_policy_def=deployment_to_serialized_autoscaling_policy_def,
serialized_request_router_cls=deployment_to_serialized_request_router_cls,
)
)
if num_ingress_deployments > 1:
Expand Down Expand Up @@ -1261,6 +1317,8 @@ def build_serve_application(
def override_deployment_info(
deployment_infos: Dict[str, DeploymentInfo],
override_config: Optional[ServeApplicationSchema],
deployment_to_serialized_autoscaling_policy_def: Optional[Dict[str, bytes]] = None,
deployment_to_serialized_request_router_cls: Optional[Dict[str, bytes]] = None,
) -> Dict[str, DeploymentInfo]:
"""Override deployment infos with options from app config.

Expand All @@ -1269,6 +1327,8 @@ def override_deployment_info(
deployment_infos: deployment info loaded from code
override_config: application config deployed by user with
options to override those loaded from code.
deployment_to_serialized_autoscaling_policy_def: serialized autoscaling policy def for each deployment
deployment_to_serialized_request_router_cls: serialized request router cls for each deployment

Returns: the updated deployment infos.

Expand Down Expand Up @@ -1315,6 +1375,16 @@ def override_deployment_info(
if autoscaling_config:
new_config.update(autoscaling_config)

if (
deployment_to_serialized_autoscaling_policy_def
and deployment_name in deployment_to_serialized_autoscaling_policy_def
):
# By setting the serialized policy def, AutoscalingConfig constructor will not
# try to import the policy from the string import path
new_config[
"_serialized_policy_def"
] = deployment_to_serialized_autoscaling_policy_def[deployment_name]

options["autoscaling_config"] = AutoscalingConfig(**new_config)

ServeUsageTag.AUTO_NUM_REPLICAS_USED.record("1")
Expand Down Expand Up @@ -1364,6 +1434,22 @@ def override_deployment_info(
)
override_options["replica_config"] = replica_config

if "request_router_config" in options:
request_router_config = options.get("request_router_config")
if request_router_config:
if (
deployment_to_serialized_request_router_cls
and deployment_name in deployment_to_serialized_request_router_cls
):
# By setting the serialized request router cls, RequestRouterConfig constructor will not
# try to import the request router cls from the string import path
request_router_config[
"_serialized_request_router_cls"
] = deployment_to_serialized_request_router_cls[deployment_name]
options["request_router_config"] = RequestRouterConfig(
**request_router_config
)

# Override deployment config options
options.pop("name", None)
original_options.update(options)
Expand Down
10 changes: 10 additions & 0 deletions python/ray/serve/_private/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ def to_proto(self):
if self.needs_pickle():
data["user_config"] = cloudpickle.dumps(data["user_config"])
if data.get("autoscaling_config"):
# By setting the serialized policy def, on the protobuf level, AutoscalingConfig constructor will not
# try to import the policy from the string import path when the protobuf is deserialized on the controller side
data["autoscaling_config"][
"_serialized_policy_def"
] = self.autoscaling_config._serialized_policy_def
data["autoscaling_config"] = AutoscalingConfigProto(
**data["autoscaling_config"]
)
Expand All @@ -258,6 +263,11 @@ def to_proto(self):
"Non-empty request_router_kwargs not supported"
f"for cross-language deployments. Got: {router_kwargs}"
)
# By setting the serialized request router cls, on the protobuf level, RequestRouterConfig constructor will not
# try to import the request router cls from the string import path when the protobuf is deserialized on the controller side
data["request_router_config"][
"_serialized_request_router_cls"
] = self.request_router_config._serialized_request_router_cls
data["request_router_config"] = RequestRouterConfigProto(
**data["request_router_config"]
)
Expand Down
26 changes: 25 additions & 1 deletion python/ray/serve/_private/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import ray.util.serialization_addons
from ray.serve._private.common import DeploymentID
from ray.serve._private.config import DeploymentConfig, ReplicaConfig
from ray.serve._private.constants import SERVE_LOGGER_NAME
from ray.serve._private.constants import (
DEFAULT_AUTOSCALING_POLICY_NAME,
DEFAULT_REQUEST_ROUTER_PATH,
SERVE_LOGGER_NAME,
)
from ray.serve._private.deployment_info import DeploymentInfo
from ray.serve.schema import ServeApplicationSchema

Expand All @@ -22,6 +26,8 @@ def get_deploy_args(
deployment_config: Optional[Union[DeploymentConfig, Dict[str, Any]]] = None,
version: Optional[str] = None,
route_prefix: Optional[str] = None,
serialized_autoscaling_policy_def: Optional[bytes] = None,
serialized_request_router_cls: Optional[bytes] = None,
) -> Dict:
"""
Takes a deployment's configuration, and returns the arguments needed
Expand All @@ -44,6 +50,8 @@ def get_deploy_args(
"route_prefix": route_prefix,
"deployer_job_id": ray.get_runtime_context().get_job_id(),
"ingress": ingress,
"serialized_autoscaling_policy_def": serialized_autoscaling_policy_def,
"serialized_request_router_cls": serialized_request_router_cls,
}

return controller_deploy_args
Expand Down Expand Up @@ -98,11 +106,27 @@ def get_app_code_version(app_config: ServeApplicationSchema) -> str:
Returns: a hash of the import path and (application level) runtime env representing
the code version of the application.
"""
autoscaling_policy_names = [
deployment.autoscaling_config.get("policy", {}).get(
"name", DEFAULT_AUTOSCALING_POLICY_NAME
)
for deployment in app_config.deployments
if isinstance(deployment.autoscaling_config, dict)
]
request_router_cls_names = [
deployment.request_router_config.get(
"request_router_class", DEFAULT_REQUEST_ROUTER_PATH
)
for deployment in app_config.deployments
if isinstance(deployment.request_router_config, dict)
]
encoded = json.dumps(
{
"import_path": app_config.import_path,
"runtime_env": app_config.runtime_env,
"args": app_config.args,
"autoscaling_policy_names": autoscaling_policy_names,
"request_router_cls_names": request_router_cls_names,
},
sort_keys=True,
).encode("utf-8")
Expand Down
14 changes: 12 additions & 2 deletions python/ray/serve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,14 @@ def __init__(self, **kwargs: dict[str, Any]):
Args:
**kwargs: Keyword arguments to pass to BaseModel.
"""
serialized_request_router_cls = kwargs.get(
"_serialized_request_router_cls", None
)
super().__init__(**kwargs)
self._serialize_request_router_cls()
if serialized_request_router_cls:
self._serialized_request_router_cls = serialized_request_router_cls
else:
self._serialize_request_router_cls()

def _serialize_request_router_cls(self) -> None:
"""Import and serialize request router class with cloudpickle.
Expand Down Expand Up @@ -281,8 +287,12 @@ def metrics_interval_s_deprecation_warning(cls, v: PositiveFloat) -> PositiveFlo
return v

def __init__(self, **kwargs):
serialized_policy_def = kwargs.get("_serialized_policy_def", None)
super().__init__(**kwargs)
self.serialize_policy()
if serialized_policy_def:
self._serialized_policy_def = serialized_policy_def
else:
self.serialize_policy()

def serialize_policy(self) -> None:
"""Serialize policy with cloudpickle.
Expand Down
28 changes: 28 additions & 0 deletions python/ray/serve/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -783,5 +783,33 @@ def test_deployment_contains_utils(serve_instance):
)


def test_deploy_use_custom_request_router(serve_instance):
"""Test that the custom request router is initialized and used correctly."""
config_file = os.path.join(
os.path.dirname(__file__),
"test_config_files",
"use_custom_request_router.yaml",
)
subprocess.check_output(["serve", "deploy", config_file], stderr=subprocess.STDOUT)
wait_for_condition(
lambda: httpx.post(f"{get_application_url(app_name='app1')}/").text
== "hello_from_custom_request_router"
)


def test_deploy_use_custom_autoscaling(serve_instance):
"""Test that the custom autoscaling is initialized correctly."""
config_file = os.path.join(
os.path.dirname(__file__),
"test_config_files",
"use_custom_autoscaling.yaml",
)
subprocess.check_output(["serve", "deploy", config_file], stderr=subprocess.STDOUT)
wait_for_condition(
lambda: httpx.post(f"{get_application_url(app_name='app1')}/").text
== "hello_from_custom_autoscaling_policy"
)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
applications:
- name: app1
route_prefix: /
import_path: ray.serve.tests.test_config_files.use_custom_autoscaling_policy:app
deployments:
- name: CustomAutoscalingPolicy
num_replicas: auto
ray_actor_options:
num_cpus: 0.0
autoscaling_config:
min_replicas: 1
max_replicas: 2
upscale_delay_s: 1
downscale_delay_s: 2
policy:
name: ray.serve.tests.test_config_files.use_custom_autoscaling_policy.custom_autoscaling_policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from ray import serve
from ray.serve._private.autoscaling_state import AutoscalingContext


def custom_autoscaling_policy(ctx: AutoscalingContext):
return 2, {}


@serve.deployment
class CustomAutoscalingPolicy:
def __call__(self):
return "hello_from_custom_autoscaling_policy"


app = CustomAutoscalingPolicy.bind()
Loading