Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"uvicorn==0.34.0",
"xxhash==3.5.0",
"psutil==7.0.0",
"pyyaml>=6.0.2",
]

[project.scripts]
Expand Down
68 changes: 61 additions & 7 deletions src/tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from unittest.mock import MagicMock

import pytest
import yaml

from vllm_router.parsers import parser

Expand All @@ -25,28 +26,80 @@ def test_verify_required_args_provided_when_service_discovery_missing_raises_sys
parser.verify_required_args_provided(args_mock)


def test_load_initial_config_from_config_json_if_required_when_config_file_is_not_provided_returns_args_without_changes() -> (
def test_load_initial_config_from_config_file_if_required_when_config_files_are_not_provided_returns_args_without_changes() -> (
None
):
args_mock = MagicMock(example=True, dynamic_config_json=None)
args_mock = MagicMock(
example=True, dynamic_config_yaml=None, dynamic_config_json=None
)
assert (
parser.load_initial_config_from_config_json_if_required(MagicMock(), args_mock)
parser.load_initial_config_from_config_file_if_required(MagicMock(), args_mock)
== args_mock
)


def test_load_initial_config_from_config_json_if_required_when_config_file_is_provided_adds_values_to_args(
def test_load_initial_config_from_config_file_if_required_when_yaml_config_file_is_provided_adds_values_to_args(
monkeypatch: pytest.MonkeyPatch,
) -> None:
with tempfile.NamedTemporaryFile() as f:
monkeypatch.setattr(sys, "argv", [sys.argv[0], "--dynamic-config-yaml", f.name])
f.write(
yaml.safe_dump(
{
"routing_logic": "roundrobin",
"service_discovery": "static",
"callbacks": "module.custom.callback_handler",
"static_models": {
"bge-m3": {
"static_backends": [
"https://endpoint1.example.com/bge-m3",
"https://endpoint2.example.com/bge-m3",
],
"static_model_type": "embeddings",
},
"bge-reranker-v2-m3": {
"static_backends": [
"https://endpoint3.example.com/bge-reranker-v2-m3",
],
"static_model_type": "rerank",
},
},
"static_aliases": {"text-embedding-3-small": "bge-m3"},
}
).encode()
)
f.seek(0)
test_parser = argparse.ArgumentParser("test")
test_parser.add_argument("--dynamic-config-yaml", type=str)
test_parser.add_argument("--dynamic-config-json", type=str)
args = test_parser.parse_args()
args = parser.load_initial_config_from_config_file_if_required(
test_parser, args
)
assert args.routing_logic == "roundrobin"
assert args.service_discovery == "static"
assert args.callbacks == "module.custom.callback_handler"
assert (
args.static_backends
== "https://endpoint1.example.com/bge-m3,https://endpoint2.example.com/bge-m3,https://endpoint3.example.com/bge-reranker-v2-m3"
)
assert args.static_models == "bge-m3,bge-m3,bge-reranker-v2-m3"
assert args.static_model_types == "embeddings,embeddings,rerank"
assert args.static_aliases == "text-embedding-3-small:bge-m3"


def test_load_initial_config_from_config_file_if_required_when_json_config_file_is_provided_adds_values_to_args(
monkeypatch: pytest.MonkeyPatch,
) -> None:
with tempfile.NamedTemporaryFile() as f:
monkeypatch.setattr(sys, "argv", [sys.argv[0], "--dynamic-config-json", f.name])
f.write(json.dumps({"routing_logic": "roundrobin"}).encode())
f.seek(0)
test_parser = argparse.ArgumentParser("test")
test_parser.add_argument("--routing-logic", type=str)
test_parser.add_argument("--dynamic-config-yaml", type=str)
test_parser.add_argument("--dynamic-config-json", type=str)
args = test_parser.parse_args()
args = parser.load_initial_config_from_config_json_if_required(
args = parser.load_initial_config_from_config_file_if_required(
test_parser, args
)
assert args.routing_logic == "roundrobin"
Expand All @@ -71,9 +124,10 @@ def test_load_initial_config_from_config_json_if_required_when_config_file_is_pr
f.seek(0)
test_parser = argparse.ArgumentParser("test")
test_parser.add_argument("--routing-logic", type=str)
test_parser.add_argument("--dynamic-config-yaml", type=str)
test_parser.add_argument("--dynamic-config-json", type=str)
args = test_parser.parse_args()
args = parser.load_initial_config_from_config_json_if_required(
args = parser.load_initial_config_from_config_file_if_required(
test_parser, args
)
assert args.routing_logic == "roundrobin"
Expand Down
37 changes: 32 additions & 5 deletions src/vllm_router/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ The router can be configured using command-line arguments. Below are the availab

### Dynamic Config Options

- `--dynamic-config-json`: The path to the json file containing the dynamic configuration.
- `--dynamic-config-yaml`: The path to the YAML file containing the dynamic configuration.
- `--dynamic-config-json`: The path to the JSON file containing the dynamic configuration.

### Sentry Options

Expand Down Expand Up @@ -104,8 +105,9 @@ different endpoints for each model type.

## Dynamic Router Config

The router can be configured dynamically using a json file when passing the `--dynamic-config-json` option.
The router will watch the json file for changes and update the configuration accordingly (every 10 seconds).
The router can be configured dynamically using a config file when passing the `--dynamic-config-yaml` or
`--dynamic-config-json` options. Please note that these are two mutually exclusive options.
The router will watch the config file for changes and update the configuration accordingly (every 10 seconds).

Currently, the dynamic config supports the following fields:

Expand All @@ -116,21 +118,46 @@ Currently, the dynamic config supports the following fields:

**Optional fields:**

- `callbacks`: The path to the callback instance extending CustomCallbackHandler.
- (When using `static` service discovery) `static_backends`: The URLs of static serving engines, separated by commas (e.g., `http://localhost:9001,http://localhost:9002,http://localhost:9003`).
- (When using `static` service discovery) `static_models`: The models running in the static serving engines, separated by commas (e.g., `model1,model2`).
- (When using `static` service discovery and if you enable the `--static-backend-health-checks` flag) `static_model_types`: The model types running in the static serving engines, separated by commas (e.g., `chat,chat`).
- (When using `k8s` service discovery) `k8s_port`: The port of vLLM processes when using K8s service discovery. Default is `8000`.
- (When using `k8s` service discovery) `k8s_namespace`: The namespace of vLLM pods when using K8s service discovery. Default is `default`.
- (When using `k8s` service discovery) `k8s_label_selector`: The label selector to filter vLLM pods when using K8s service discovery.
- `session_key`: The key (in the header) to identify a session when using session-based routing.

Here is an example dynamic config file:
Here is an example of a dynamic YAML config file:

```yaml
service_discovery: static
routing_logic: roundrobin
callbacks: module.custom.callback_handler
static_models:
facebook/opt-125m:
static_backends:
- http://localhost:9001
static_model_type: completion
meta-llama/Llama-3.1-8B-Instruct:
static_backends:
- http://localhost:9002
static_model_type: chat
facebook/opt-125m:
static_backends:
- http://localhost:9003
static_model_type: completion
```

Here is an example of a dynamic JSON config file:

```json
{
"service_discovery": "static",
"routing_logic": "roundrobin",
"callbacks": "module.custom.callback_handler",
"static_backends": "http://localhost:9001,http://localhost:9002,http://localhost:9003",
"static_models": "facebook/opt-125m,meta-llama/Llama-3.1-8B-Instruct,facebook/opt-125m"
"static_models": "facebook/opt-125m,meta-llama/Llama-3.1-8B-Instruct,facebook/opt-125m",
"static_model_types": "completion,chat,completion"
}
```

Expand Down
7 changes: 6 additions & 1 deletion src/vllm_router/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,15 @@ def initialize_all(app: FastAPI, args):
)

# Initialize dynamic config watcher
if args.dynamic_config_yaml:
init_config = DynamicRouterConfig.from_args(args)
initialize_dynamic_config_watcher(
args.dynamic_config_yaml, "YAML", 10, init_config, app
)
if args.dynamic_config_json:
init_config = DynamicRouterConfig.from_args(args)
initialize_dynamic_config_watcher(
args.dynamic_config_json, 10, init_config, app
args.dynamic_config_json, "JSON", 10, init_config, app
)

if args.callbacks:
Expand Down
45 changes: 36 additions & 9 deletions src/vllm_router/dynamic_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import dataclasses
import json
import threading
import time
from dataclasses import dataclass
from typing import Optional
from typing import Any, Literal, Optional

from fastapi import FastAPI

from vllm_router.log import init_logger
from vllm_router.parsers.yaml_utils import (
read_and_process_yaml_config_file,
)
from vllm_router.routers.routing_logic import reconfigure_routing_logic
from vllm_router.service_discovery import (
ServiceDiscoveryType,
Expand Down Expand Up @@ -49,6 +53,7 @@ class DynamicRouterConfig:
# Service discovery configurations
static_backends: Optional[str] = None
static_models: Optional[str] = None
static_model_types: Optional[str] = None
static_aliases: Optional[str] = None
k8s_port: Optional[int] = None
k8s_namespace: Optional[str] = None
Expand All @@ -57,6 +62,9 @@ class DynamicRouterConfig:
# Routing logic configurations
session_key: Optional[str] = None

# Logging Options
callbacks: Optional[str] = None

# Batch API configurations
# TODO (ApostaC): Support dynamic reconfiguration of batch API
# enable_batch_api: bool
Expand All @@ -77,15 +85,23 @@ def from_args(args) -> "DynamicRouterConfig":
service_discovery=args.service_discovery,
static_backends=args.static_backends,
static_models=args.static_models,
static_model_types=args.static_model_types,
static_aliases=args.static_aliases,
k8s_port=args.k8s_port,
k8s_namespace=args.k8s_namespace,
k8s_label_selector=args.k8s_label_selector,
# Routing logic configurations
routing_logic=args.routing_logic,
session_key=args.session_key,
# Logging Options
callbacks=args.callbacks,
)

@staticmethod
def from_yaml(yaml_path: str) -> "DynamicRouterConfig":
config = read_and_process_yaml_config_file(yaml_path)
return DynamicRouterConfig(**config)

@staticmethod
def from_json(json_path: str) -> "DynamicRouterConfig":
with open(json_path, "r") as f:
Expand All @@ -98,12 +114,13 @@ def to_json_str(self) -> str:

class DynamicConfigWatcher(metaclass=SingletonMeta):
"""
Watches a config json file for changes and updates the DynamicRouterConfig accordingly.
Watches a config file for changes and updates the DynamicRouterConfig accordingly.
"""

def __init__(
self,
config_json: str,
config_path: str,
config_file_type: Literal["YAML", "JSON"],
watch_interval: int,
init_config: DynamicRouterConfig,
app: FastAPI,
Expand All @@ -112,11 +129,13 @@ def __init__(
Initializes the ConfigMapWatcher with the given ConfigMap name and namespace.

Args:
config_json: the path to the json file containing the dynamic configuration
config_path: the path to the config file containing the dynamic configuration
config_file_type: the config file type containing the dynamic configuration (YAML or JSON)
watch_interval: the interval in seconds at which to watch the for changes
app: the fastapi app to reconfigure
"""
self.config_json = config_json
self.config_path = config_path
self.config_file_type = config_file_type
self.watch_interval = watch_interval
self.current_config = init_config
self.app = app
Expand Down Expand Up @@ -205,7 +224,12 @@ def _watch_worker(self):
"""
while self.running:
try:
config = DynamicRouterConfig.from_json(self.config_json)
if self.config_file_type == "YAML":
config = DynamicRouterConfig.from_yaml(self.config_path)
elif self.config_file_type == "JSON":
config = DynamicRouterConfig.from_json(self.config_path)
else:
raise Exception("Unsupported config file type.")
if config != self.current_config:
logger.info(
"DynamicConfigWatcher: Config changed, reconfiguring..."
Expand All @@ -228,15 +252,18 @@ def close(self):


def initialize_dynamic_config_watcher(
config_json: str,
config_path: str,
config_file_type: Literal["YAML", "JSON"],
watch_interval: int,
init_config: DynamicRouterConfig,
app: FastAPI,
):
"""
Initializes the DynamicConfigWatcher with the given config json and watch interval.
Initializes the DynamicConfigWatcher with the given config path, file type and watch interval.
"""
return DynamicConfigWatcher(config_json, watch_interval, init_config, app)
return DynamicConfigWatcher(
config_path, config_file_type, watch_interval, init_config, app
)


def get_dynamic_config_watcher() -> DynamicConfigWatcher:
Expand Down
Loading