Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .buildkite/llm.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,18 @@ steps:
commands:
- RAYCI_DISABLE_TEST_DB=1 bazel run //ci/ray_ci:test_in_docker -- //python/ray/llm/... //doc/... llm
--python-version 3.11 --build-name llmgpubuild --only-tags gpu
--except-tags multi_gpu_4
depends_on: llmgpubuild

- label: "llm gpu tests (4 GPUs)"
key: "llm-gpu-tests-4gpu"
tags:
- llm
- gpu
instance_type: gpu-large
commands:
- RAYCI_DISABLE_TEST_DB=1 bazel run //ci/ray_ci:test_in_docker -- //doc/... llm
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add

//python/ray/llm/...

so potential multi_gpu_4 targets added there will be picked up too?

--python-version 3.11 --build-name llmgpubuild
--only-tags multi_gpu_4
--gpus 4
depends_on: llmgpubuild
18 changes: 16 additions & 2 deletions doc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,11 @@ filegroup(
visibility = ["//doc:__subpackages__"],
)

# GPU Tests
# GPU Tests (standard GPU tests)
py_test_run_all_subdirectory(
size = "large",
include = ["source/llm/doc_code/serve/**/*.py"],
exclude = [],
exclude = ["source/llm/doc_code/serve/multi_gpu/**/*.py"],
extra_srcs = [],
data = ["source/llm/doc_code/serve/qwen/llm_config_example.yaml"],
tags = [
Expand All @@ -356,6 +356,20 @@ py_test_run_all_subdirectory(
],
)

# Multi-GPU Tests (4+ GPUs)
py_test_run_all_subdirectory(
size = "large",
include = ["source/llm/doc_code/serve/multi_gpu/**/*.py"],
exclude = [],
extra_srcs = [],
tags = [
"exclusive",
"gpu",
"multi_gpu_4",
"team:llm",
],
)

# --------------------------------------------------------------------
# Test all doc/source/data/doc_code/working-with-llms code included in rst/md files.
# --------------------------------------------------------------------
Expand Down
84 changes: 84 additions & 0 deletions doc/source/llm/doc_code/serve/multi_gpu/dp_basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""
This file serves as a documentation example and CI test for basic data parallel attention deployment.

Structure:
1. Monkeypatch setup: Ensures serve.run is non-blocking and removes accelerator requirements for CI testing.
2. Docs example (between __dp_basic_example_start/end__): Embedded in Sphinx docs via literalinclude.
3. Test validation (deployment status polling + cleanup)
"""

import time
from ray import serve
from ray.serve.schema import ApplicationStatus
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve import llm

_original_serve_run = serve.run
_original_build_dp_openai_app = llm.build_dp_openai_app


def _non_blocking_serve_run(app, **kwargs):
"""Forces blocking=False for testing"""
kwargs["blocking"] = False
return _original_serve_run(app, **kwargs)


def _testing_build_dp_openai_app(builder_config, **kwargs):
"""Removes accelerator requirements for testing"""
if "llm_config" in builder_config:
config = builder_config["llm_config"]
if hasattr(config, "accelerator_type") and config.accelerator_type is not None:
config.accelerator_type = None
return _original_build_dp_openai_app(builder_config, **kwargs)


serve.run = _non_blocking_serve_run
llm.build_dp_openai_app = _testing_build_dp_openai_app

# __dp_basic_example_start__
from ray import serve
from ray.serve.llm import LLMConfig, build_dp_openai_app

# Configure the model with data parallel settings
config = LLMConfig(
model_loading_config={
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
},
engine_kwargs={
"data_parallel_size": 2, # Number of DP replicas
"tensor_parallel_size": 1, # TP size per replica
},
experimental_configs={
# This is a temporary required config. We will remove this in future versions.
"dp_size_per_node": 2, # DP replicas per node
},
)

app = build_dp_openai_app({
"llm_config": config
})

serve.run(app, blocking=True)
# __dp_basic_example_end__

status = ApplicationStatus.NOT_STARTED
timeout_seconds = 300
start_time = time.time()

while (
status != ApplicationStatus.RUNNING and time.time() - start_time < timeout_seconds
):
status = serve.status().applications[SERVE_DEFAULT_APP_NAME].status

if status in [ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY]:
raise AssertionError(f"Deployment failed with status: {status}")

time.sleep(1)

if status != ApplicationStatus.RUNNING:
raise AssertionError(
f"Deployment failed to reach RUNNING status within {timeout_seconds}s. Current status: {status}"
)

serve.shutdown()

135 changes: 135 additions & 0 deletions doc/source/llm/doc_code/serve/multi_gpu/dp_pd_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""
This file serves as a documentation example and CI test for data parallel + prefill-decode disaggregation.

Structure:
1. Monkeypatch setup: Ensures serve.run is non-blocking and removes accelerator requirements for CI testing.
2. Docs example (between __dp_pd_example_start/end__): Embedded in Sphinx docs via literalinclude.
3. Test validation (deployment status polling + cleanup)
"""

import time
from ray import serve
from ray.serve.schema import ApplicationStatus
from ray.serve._private.constants import SERVE_DEFAULT_APP_NAME
from ray.serve import llm
from ray.serve.llm.deployment import PDProxyServer
from ray.serve.llm.ingress import OpenAiIngress, make_fastapi_ingress

# Check if NIXL is available (required for NixlConnector)
try:
import nixl # noqa: F401
NIXL_AVAILABLE = True
except ImportError:
NIXL_AVAILABLE = False

if not NIXL_AVAILABLE:
raise ImportError(
"NIXL is required for this example but is not installed. "
"Install it with: pip install nixl or uv pip install nixl"
)

_original_serve_run = serve.run
_original_build_dp_deployment = llm.build_dp_deployment


def _non_blocking_serve_run(app, **kwargs):
"""Forces blocking=False for testing"""
kwargs["blocking"] = False
return _original_serve_run(app, **kwargs)


def _testing_build_dp_deployment(llm_config, **kwargs):
"""Removes accelerator requirements for testing"""
if llm_config.accelerator_type is not None:
llm_config.accelerator_type = None
return _original_build_dp_deployment(llm_config, **kwargs)


serve.run = _non_blocking_serve_run
llm.build_dp_deployment = _testing_build_dp_deployment

# __dp_pd_example_start__
from ray import serve
from ray.serve.llm import LLMConfig, build_dp_deployment
from ray.serve.llm.deployment import PDProxyServer
from ray.serve.llm.ingress import OpenAiIngress, make_fastapi_ingress

# Configure prefill with data parallel attention
prefill_config = LLMConfig(
model_loading_config={
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
},
engine_kwargs={
"data_parallel_size": 2, # 2 DP replicas for prefill
"tensor_parallel_size": 1,
"kv_transfer_config": {
"kv_connector": "NixlConnector",
"kv_role": "kv_both",
}
},
experimental_configs={
"dp_size_per_node": 2,
},
)

# Configure decode with data parallel attention
decode_config = LLMConfig(
model_loading_config={
"model_id": "Qwen/Qwen2.5-0.5B-Instruct"
},
engine_kwargs={
"data_parallel_size": 2, # 2 DP replicas for decode (adjusted for 4 GPU limit)
"tensor_parallel_size": 1,
"kv_transfer_config": {
"kv_connector": "NixlConnector",
"kv_role": "kv_both",
}
},
experimental_configs={
"dp_size_per_node": 2,
},
)

# Build prefill and decode deployments with DP
prefill_deployment = build_dp_deployment(prefill_config, name_prefix="Prefill:")
decode_deployment = build_dp_deployment(decode_config, name_prefix="Decode:")

# Create PDProxyServer to coordinate between prefill and decode
proxy_options = PDProxyServer.get_deployment_options(prefill_config, decode_config)
proxy_deployment = serve.deployment(PDProxyServer).options(**proxy_options).bind(
prefill_server=prefill_deployment,
decode_server=decode_deployment,
)

# Create OpenAI-compatible ingress
ingress_options = OpenAiIngress.get_deployment_options([prefill_config, decode_config])
ingress_cls = make_fastapi_ingress(OpenAiIngress)
ingress_deployment = serve.deployment(ingress_cls).options(**ingress_options).bind(
llm_deployments=[proxy_deployment]
)

# Deploy the application
serve.run(ingress_deployment, blocking=True)
# __dp_pd_example_end__

status = ApplicationStatus.NOT_STARTED
timeout_seconds = 300 # Longer timeout for DP+PD setup
start_time = time.time()

while (
status != ApplicationStatus.RUNNING and time.time() - start_time < timeout_seconds
):
status = serve.status().applications[SERVE_DEFAULT_APP_NAME].status

if status in [ApplicationStatus.DEPLOY_FAILED, ApplicationStatus.UNHEALTHY]:
raise AssertionError(f"Deployment failed with status: {status}")

time.sleep(1)

if status != ApplicationStatus.RUNNING:
raise AssertionError(
f"Deployment failed to reach RUNNING status within {timeout_seconds}s. Current status: {status}"
)

serve.shutdown()

Loading