From fa6cfd7adcd026621f6727ed560d3391bd09c1fd Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Mon, 25 Aug 2025 10:49:19 +0200 Subject: [PATCH 01/19] add logging Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 7dc0fd651..52b7b790b 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -571,6 +571,7 @@ def _get_model_label(self, pod) -> Optional[str]: def _watch_engines(self): while self.running: try: + logger.info(f"Watching engines: {self.available_engines}") for event in self.k8s_watcher.stream( self.k8s_api.list_namespaced_pod, namespace=self.namespace, From a929a873cf905564462e72d4e926b4baf6d81dc8 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Mon, 25 Aug 2025 15:57:08 +0200 Subject: [PATCH 02/19] add unit test for service discovery Signed-off-by: jonoillar --- .../test_k8s_pod_ip_service_discovery.py | 211 ++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 src/tests/test_k8s_pod_ip_service_discovery.py diff --git a/src/tests/test_k8s_pod_ip_service_discovery.py b/src/tests/test_k8s_pod_ip_service_discovery.py new file mode 100644 index 000000000..da1d04fba --- /dev/null +++ b/src/tests/test_k8s_pod_ip_service_discovery.py @@ -0,0 +1,211 @@ +from unittest.mock import MagicMock, patch +import pytest +import time + +from vllm_router.service_discovery import K8sPodIPServiceDiscovery, EndpointInfo + + +@pytest.fixture +def mock_app(): + """Mock FastAPI app instance.""" + app = MagicMock() + app.state = MagicMock() + return app + + +@pytest.fixture +def mock_k8s_dependencies(): + """Mock all Kubernetes dependencies.""" + with patch('vllm_router.service_discovery.config') as mock_config, \ + patch('vllm_router.service_discovery.client.CoreV1Api') as mock_api_class, \ + patch('vllm_router.service_discovery.watch.Watch') as mock_watch_class, \ + patch('vllm_router.service_discovery.requests') as mock_requests: + # Mock config loading + mock_config.load_incluster_config.return_value = None + + # Mock API client + mock_api = MagicMock() + mock_api_class.return_value = mock_api + + # Mock watcher + mock_watcher = MagicMock() + mock_watch_class.return_value = mock_watcher + + # Mock HTTP responses + mock_response = MagicMock() + mock_response.json.return_value = {"data": [{"id": "test-model"}]} + mock_response.raise_for_status.return_value = None + mock_requests.get.return_value = mock_response + + yield { + 'config': mock_config, + 'api': mock_api, + 'watcher': mock_watcher, + 'requests': mock_requests + } + + +def create_mock_pod_event(event_type, pod_name, pod_ip, ready=True, terminating=False, model_label="test-model"): + """Helper method to create a mock Kubernetes pod event.""" + pod = MagicMock() + pod.metadata.name = pod_name + pod.metadata.labels = {"model": model_label} if model_label else {} + if terminating: + pod.metadata.deletion_timestamp = "2024-01-01T00:00:00Z" + else: + pod.metadata.deletion_timestamp = None + + pod.status.pod_ip = pod_ip + pod.status.container_statuses = [MagicMock(ready=ready)] if ready else [] + + return { + "type": event_type, + "object": pod + } + + +def test_scenario_1_two_pods_present(mock_app, mock_k8s_dependencies): + """Test scenario 1: 2 model pods present and running.""" + # Create a generator that yields events and then stops + def mock_stream_generator(): + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + # Stop after yielding the events + raise Exception("Simulated timeout") + + mock_k8s_dependencies['watcher'].stream.return_value = mock_stream_generator() + + # Mock sleep mode check to return False + with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + discovery = K8sPodIPServiceDiscovery( + app=mock_app, + namespace="test-namespace", + port="8000" + ) + + # Give the watcher thread time to process the events + time.sleep(0.1) # hardcoded 0.1 so that while the watcher sleeps after failing, this sleeping is exhausted + + # Check that both engines are in available_engines + assert len(discovery.available_engines) == 2 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines + + # Verify the endpoint info + engine_1 = discovery.available_engines["engine_1"] + engine_2 = discovery.available_engines["engine_2"] + + assert isinstance(engine_1, EndpointInfo) + assert isinstance(engine_2, EndpointInfo) + assert engine_1.url == "http://10.0.0.1:8000" + assert engine_2.url == "http://10.0.0.2:8000" + assert engine_1.model_names == ["test-model"] + assert engine_2.model_names == ["test-model"] + assert engine_1.model_label == "model-1" + assert engine_2.model_label == "model-2" + + discovery.close() + + +def test_scenario_2_pod_deletion(mock_app, mock_k8s_dependencies): + """Test scenario 2: 2 pods present, then 1 gets deleted.""" + # Mock the watcher stream to return 2 ADDED events followed by 1 DELETED event + # Create a generator that yields events and then stops + def mock_stream_generator(): + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event("DELETED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + # Stop after yielding the events + raise Exception("Simulated timeout") + + mock_k8s_dependencies['watcher'].stream.return_value = mock_stream_generator() + + # Mock sleep mode check to return False + with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + discovery = K8sPodIPServiceDiscovery( + app=mock_app, + namespace="test-namespace", + port="8000" + ) + + # Give the watcher thread time to process all events + time.sleep(0.3) + + # Check that only engine_2 remains in available_engines + assert len(discovery.available_engines) == 1 + assert "engine_1" not in discovery.available_engines + assert "engine_2" in discovery.available_engines + + # Verify the remaining endpoint info + engine_2 = discovery.available_engines["engine_2"] + assert isinstance(engine_2, EndpointInfo) + assert engine_2.url == "http://10.0.0.2:8000" + assert engine_2.model_names == ["test-model"] + assert engine_2.model_label == "model-2" + + discovery.close() + + +def test_scenario_3_pod_addition_after_timeout(mock_app, mock_k8s_dependencies, caplog): + """Test scenario 3: 2 pods present, then 1 more added after timeout.""" + + # Create a generator that yields different events on each iteration + def mock_stream_generator(): + # First iteration: 2 pods + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + + # Simulate timeout by raising StopIteration + raise StopIteration() + + def mock_stream_generator_second(): + # Second iteration: 3 pods (including the new one) + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event("ADDED", "engine_3", "10.0.0.3", ready=True, model_label="model-3") + + # Simulate timeout + raise StopIteration() + + # Mock the watcher stream to use our generator + mock_k8s_dependencies['watcher'].stream.side_effect = [ + mock_stream_generator(), + mock_stream_generator_second() + ] + + # Mock sleep mode check to return False + with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + discovery = K8sPodIPServiceDiscovery( + app=mock_app, + namespace="test-namespace", + port="8000" + ) + + # Give the watcher thread time to process the first iteration + time.sleep(0.5) + discovery.running = False # Stop the while loop + + # Check that both engines are in available_engines after first iteration + assert len(discovery.available_engines) == 2 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines + assert "engine_3" not in discovery.available_engines + discovery.running = True # Restart the while loop + + # Give more time for the second iteration to process + time.sleep(0.5) + + # Check that all three engines are in available_engines after second iteration + assert len(discovery.available_engines) == 3 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines + assert "engine_3" in discovery.available_engines + + # Verify the new endpoint info + engine_3 = discovery.available_engines["engine_3"] + assert isinstance(engine_3, EndpointInfo) + assert engine_3.url == "http://10.0.0.3:8000" + assert engine_3.model_names == ["test-model"] + assert engine_3.model_label == "model-3" + + discovery.close() From e82d3f918c27067df19bca4eed2d6d8d6453f04a Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Mon, 25 Aug 2025 16:30:24 +0200 Subject: [PATCH 03/19] add logs and tests Signed-off-by: jonoillar --- .../test_k8s_pod_ip_service_discovery.py | 97 +++++++++++++++++++ src/vllm_router/service_discovery.py | 3 +- 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/src/tests/test_k8s_pod_ip_service_discovery.py b/src/tests/test_k8s_pod_ip_service_discovery.py index da1d04fba..b010be1ee 100644 --- a/src/tests/test_k8s_pod_ip_service_discovery.py +++ b/src/tests/test_k8s_pod_ip_service_discovery.py @@ -209,3 +209,100 @@ def mock_stream_generator_second(): assert engine_3.model_label == "model-3" discovery.close() + + +def test_scenario_4_slow_models_call_blocks_deletion(mock_app, mock_k8s_dependencies): + """Test scenario 4: Slow /v1/models call blocks deletion event processing.""" + + # Track how many times we've been called to simulate different behaviors + should_call_false = False + + def mock_slow_requests_get(url, headers=None): + if should_call_false: + # Third call to engine_1's /v1/models - simulate slow response + time.sleep(40) # Simulate a slow call that would exceed timeout in real scenario + raise Exception("Simulated slow response") + else: + # Normal fast response for other calls + mock_response = MagicMock() + mock_response.json.return_value = {"data": [{"id": "test-model"}]} + mock_response.raise_for_status.return_value = None + return mock_response + + # Create generators for each watch iteration + def mock_stream_generator_first(): + # First iteration: 2 pods added + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + raise Exception("Simulated timeout") + + def mock_stream_generator_second(): + # Second iteration: same 2 pods added again (no change) + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + raise Exception("Simulated timeout") + + def mock_stream_generator_third(): + # Third iteration: engine_1 slow call, engine_2 deleted + yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event("DELETED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + raise Exception("Simulated timeout") + + # Mock the watcher stream to use our generators + mock_k8s_dependencies['watcher'].stream.side_effect = [ + mock_stream_generator_first(), + mock_stream_generator_second(), + mock_stream_generator_third() + ] + + # Mock the requests.get to simulate slow response + mock_k8s_dependencies['requests'].get.side_effect = mock_slow_requests_get + + # Mock sleep mode check to return False + with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + discovery = K8sPodIPServiceDiscovery( + app=mock_app, + namespace="test-namespace", + port="8000" + ) + + # First iteration: Give time for both engines to be added + time.sleep(0.5) + discovery.running = False + + # Check that both engines are in available_engines after first iteration + assert len(discovery.available_engines) == 2 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines + discovery.running = True + + # Second iteration: Give time for the second iteration (should be no change) + time.sleep(0.5) + discovery.running = False + + # Check that both engines are still in available_engines after second iteration + assert len(discovery.available_engines) == 2 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines + + # Third iteration: Give time for the third iteration + # The slow call to engine_1's /v1/models should block processing + # and prevent the DELETED event for engine_2 from being processed + time.sleep(0.3) + + # Check that engine_2 is still in available_engines because the DELETED event + # was not processed due to the slow /v1/models call blocking the stream + assert len(discovery.available_engines) == 2 + assert "engine_1" in discovery.available_engines + assert "engine_2" in discovery.available_engines # Should still be here! + + # Verify that engine_1 is still there (even though the /v1/models call was slow) + engine_1 = discovery.available_engines["engine_1"] + engine_2 = discovery.available_engines["engine_2"] + + assert isinstance(engine_1, EndpointInfo) + assert isinstance(engine_2, EndpointInfo) + assert engine_1.url == "http://10.0.0.1:8000" + assert engine_2.url == "http://10.0.0.2:8000" + + discovery.close() \ No newline at end of file diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 52b7b790b..def9d292e 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -503,6 +503,7 @@ def _get_model_names(self, pod_ip) -> List[str]: List of model names available on the serving engine, including both base models and adapters """ url = f"http://{pod_ip}:{self.port}/v1/models" + logger.debug(f"Get model names for pod {pod_ip}") try: headers = None if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): @@ -571,7 +572,7 @@ def _get_model_label(self, pod) -> Optional[str]: def _watch_engines(self): while self.running: try: - logger.info(f"Watching engines: {self.available_engines}") + logger.debug(f"Watching engines: {self.available_engines}") for event in self.k8s_watcher.stream( self.k8s_api.list_namespaced_pod, namespace=self.namespace, From 6cef81ec20c91c9288c33e5cc30728968d5ddef4 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Mon, 25 Aug 2025 16:51:21 +0200 Subject: [PATCH 04/19] add failing test Signed-off-by: jonoillar --- src/tests/test_k8s_pod_ip_service_discovery.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/tests/test_k8s_pod_ip_service_discovery.py b/src/tests/test_k8s_pod_ip_service_discovery.py index b010be1ee..132b37959 100644 --- a/src/tests/test_k8s_pod_ip_service_discovery.py +++ b/src/tests/test_k8s_pod_ip_service_discovery.py @@ -284,11 +284,14 @@ def mock_stream_generator_third(): assert len(discovery.available_engines) == 2 assert "engine_1" in discovery.available_engines assert "engine_2" in discovery.available_engines + should_call_false = True + discovery.running = True # Third iteration: Give time for the third iteration # The slow call to engine_1's /v1/models should block processing # and prevent the DELETED event for engine_2 from being processed - time.sleep(0.3) + time.sleep(0.5) + discovery.running = False # Check that engine_2 is still in available_engines because the DELETED event # was not processed due to the slow /v1/models call blocking the stream From a139da5b0cd7489d12e17b6adeb1babebb2c637b Mon Sep 17 00:00:00 2001 From: jonoillar Date: Tue, 26 Aug 2025 16:48:31 +0200 Subject: [PATCH 05/19] fix rebasing Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 275 ++++++++++++++++++++------- 1 file changed, 202 insertions(+), 73 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index def9d292e..4eee7882b 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -24,7 +24,9 @@ import aiohttp import requests +import queue from kubernetes import client, config, watch +from collections import OrderedDict from vllm_router import utils from vllm_router.log import init_logger @@ -383,12 +385,19 @@ def __init__( self.k8s_api = client.CoreV1Api() self.k8s_watcher = watch.Watch() + # Event queue and processor + self.event_queue = queue.Queue() + self.event_processor_task = None + self.resource_version = None + # Start watching engines self.running = True self.watcher_thread = threading.Thread(target=self._watch_engines, daemon=True) self.watcher_thread.start() self.prefill_model_labels = prefill_model_labels self.decode_model_labels = decode_model_labels + self.failing_counter = 0 # TODO: remove when implementation works + self.use_ressource_version = False # TODO: remove when implementation works @staticmethod def _check_pod_ready(container_statuses): @@ -409,7 +418,7 @@ def _is_pod_terminating(pod): """ return pod.metadata.deletion_timestamp is not None - def _get_engine_sleep_status(self, pod_ip) -> Optional[bool]: + async def _get_engine_sleep_status_async(self, pod_ip) -> Optional[bool]: """ Get the engine sleeping status by querying the engine's '/is_sleeping' endpoint. @@ -426,10 +435,14 @@ def _get_engine_sleep_status(self, pod_ip) -> Optional[bool]: if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - response = requests.get(url, headers=headers) - response.raise_for_status() - sleep = response.json()["is_sleeping"] - return sleep + + # Use aiohttp for async HTTP requests + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + response.raise_for_status() + data = await response.json() + sleep = data["is_sleeping"] + return sleep except Exception as e: logger.warning( f"Failed to get the sleep status for engine at {url} - sleep status is set to `False`: {e}" @@ -491,7 +504,7 @@ def remove_sleep_label(self, pod_name): except client.rest.ApiException as e: logger.error(f"Error removing sleeping label: {e}") - def _get_model_names(self, pod_ip) -> List[str]: + async def _get_model_names_async(self, pod_ip) -> List[str]: """ Get the model names of the serving engine pod by querying the pod's '/v1/models' endpoint. @@ -503,29 +516,37 @@ def _get_model_names(self, pod_ip) -> List[str]: List of model names available on the serving engine, including both base models and adapters """ url = f"http://{pod_ip}:{self.port}/v1/models" - logger.debug(f"Get model names for pod {pod_ip}") try: headers = None + self.failing_counter += 1 + logger.info(f"{self.failing_counter=}") if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): + if self.failing_counter > 3: + await asyncio.sleep(60) + VLLM_API_KEY = "wrong_key_jklkjlkj" logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - response = requests.get(url, headers=headers) - response.raise_for_status() - models = response.json()["data"] - - # Collect all model names, including both base models and adapters - model_names = [] - for model in models: - model_id = model["id"] - model_names.append(model_id) - - logger.info(f"Found models on pod {pod_ip}: {model_names}") - return model_names + + # Use aiohttp for async HTTP requests + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + response.raise_for_status() + data = await response.json() + models = data["data"] + + # Collect all model names, including both base models and adapters + model_names = [] + for model in models: + model_id = model["id"] + model_names.append(model_id) + + logger.info(f"Found models on pod {pod_ip}: {model_names}") + return model_names except Exception as e: logger.error(f"Failed to get model names from {url}: {e}") return [] - def _get_model_info(self, pod_ip) -> Dict[str, ModelInfo]: + async def _get_model_info_async(self, pod_ip) -> Dict[str, ModelInfo]: """ Get detailed model information from the serving engine pod. @@ -541,16 +562,20 @@ def _get_model_info(self, pod_ip) -> Dict[str, ModelInfo]: if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - response = requests.get(url, headers=headers) - response.raise_for_status() - models = response.json()["data"] - # Create a dictionary of model information - model_info = {} - for model in models: - model_id = model["id"] - model_info[model_id] = ModelInfo.from_dict(model) - - return model_info + + # Use aiohttp for async HTTP requests + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + response.raise_for_status() + data = await response.json() + models = data["data"] + # Create a dictionary of model information + model_info = {} + for model in models: + model_id = model["id"] + model_info[model_id] = ModelInfo.from_dict(model) + + return model_info except Exception as e: logger.error(f"Failed to get model info from {url}: {e}") return {} @@ -570,55 +595,151 @@ def _get_model_label(self, pod) -> Optional[str]: return pod.metadata.labels.get("model") def _watch_engines(self): + """ + Watcher thread that only enqueues events. All processing is done by the async event processor. + """ while self.running: try: - logger.debug(f"Watching engines: {self.available_engines}") + logger.info(f"K8s watcher started{self.get_endpoint_info()}") + logger.info("time out is 30") + logger.info("Jon latest version v2") + + # Use resource version for efficient watching + watch_params = { + "namespace": self.namespace, + "label_selector": self.label_selector, + "timeout_seconds": 30, + } + if self.resource_version and self.use_ressource_version: + watch_params["resource_version"] = self.resource_version + + + logger.debug(f"{watch_params=}") for event in self.k8s_watcher.stream( self.k8s_api.list_namespaced_pod, - namespace=self.namespace, - label_selector=self.label_selector, - timeout_seconds=self.watcher_timeout_seconds, + **watch_params ): - pod = event["object"] - event_type = event["type"] - pod_name = pod.metadata.name - pod_ip = pod.status.pod_ip - - # Check if pod is terminating - is_pod_terminating = self._is_pod_terminating(pod) - is_container_ready = self._check_pod_ready( - pod.status.container_statuses - ) + logger.debug(f"Watching pod {event["object"].metadata.name} as event type: {event['type']}") + # Update resource version + self.resource_version = event["object"].metadata.resource_version - # Pod is ready if container is ready and pod is not terminating - is_pod_ready = is_container_ready and not is_pod_terminating + # Enqueue event by key (pod_name) to avoid duplicates + self._enqueue_event(event) + + except Exception as e: + logger.error(f"K8s watcher error: {e}") + time.sleep(0.5) - if is_pod_ready: - model_names = self._get_model_names(pod_ip) - model_label = self._get_model_label(pod) - else: - model_names = [] - model_label = None + def _enqueue_event(self, event: dict): + """ + Enqueue event by pod name key. If a newer event for the same pod exists, + replace the older event. + """ + pod_name = event["object"].metadata.name + # Create a key-based queue using OrderedDict + if not hasattr(self, '_event_queue_dict'): + self._event_queue_dict = OrderedDict() + + # Add/update event in the ordered dict + self._event_queue_dict[pod_name] = event + + # Put the pod name in the queue for processing + try: + self.event_queue.put_nowait(pod_name) + except queue.Full: + logger.warning(f"Event queue is full, dropping event for pod {pod_name}") - # Record pod status for debugging - if is_container_ready and is_pod_terminating: - logger.info( - f"Pod {pod_name} has ready containers but is terminating - marking as unavailable" - ) + async def _start_event_processor(self): + """ + Start the async event processor. + """ + if self.event_processor_task is None: + self.event_processor_task = asyncio.create_task(self._process_events()) - self._on_engine_update( - pod_name, - pod_ip, - event_type, - is_pod_ready, - model_names, - model_label, - ) + async def _process_events(self): + """ + Async event processor that handles events from the queue. + """ + while self.running: + try: + # Get pod name from queue + pod_name = await asyncio.get_event_loop().run_in_executor( + None, self.event_queue.get, True, 1.0 + ) + + # Get the event from our ordered dict + event = self._event_queue_dict.pop(pod_name, None) + if event is None: + continue + + # Process the event asynchronously + await self._process_single_event(event) + + except queue.Empty: + continue except Exception as e: - logger.error(f"K8s watcher error: {e}") - time.sleep(0.5) + logger.error(f"Event processor error: {e}") + await asyncio.sleep(0.1) + + async def _process_single_event(self, event: dict): + """ + Process a single event asynchronously. + """ + pod = event["object"] + event_type = event["type"] + pod_name = pod.metadata.name + pod_ip = pod.status.pod_ip + + logger.info(f"pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") + + # Preprocess the event to get all necessary information + preprocessed_data = await self._preprocess_event(pod, pod_ip) + + # Call the async engine update handler + await self._on_engine_update_async( + pod_name, + pod_ip, + event_type, + preprocessed_data["is_pod_ready"], + preprocessed_data["model_names"], + preprocessed_data["model_label"], + ) + + async def _preprocess_event(self, pod, pod_ip: str) -> dict: + """ + Preprocess event data to extract all necessary information. + This method contains the logic that was previously inline in _watch_engines. + """ + # Check if pod is terminating + is_pod_terminating = self._is_pod_terminating(pod) + is_container_ready = self._check_pod_ready( + pod.status.container_statuses + ) + + logger.info(f"{is_container_ready=}") + # Pod is ready if container is ready and pod is not terminating + is_pod_ready = is_container_ready and not is_pod_terminating - def _add_engine( + if is_pod_ready: + model_names = await self._get_model_names_async(pod_ip) + model_label = self._get_model_label(pod) + else: + model_names = [] + model_label = None + + # Record pod status for debugging + if is_container_ready and is_pod_terminating: + logger.info( + f"Pod {pod.metadata.name} has ready containers but is terminating - marking as unavailable" + ) + + return { + "is_pod_ready": is_pod_ready, + "model_names": model_names, + "model_label": model_label, + } + + async def _add_engine_async( self, engine_name: str, engine_ip: str, model_names: List[str], model_label: str ): logger.info( @@ -627,11 +748,11 @@ def _add_engine( ) # Get detailed model information - model_info = self._get_model_info(engine_ip) + model_info = await self._get_model_info_async(engine_ip) # Check if engine is enabled with sleep mode and set engine sleep status if self._check_engine_sleep_mode(engine_name): - sleep_status = self._get_engine_sleep_status(engine_ip) + sleep_status = await self._get_engine_sleep_status_async(engine_ip) else: sleep_status = False @@ -656,7 +777,7 @@ def _delete_engine(self, engine_name: str): with self.available_engines_lock: del self.available_engines[engine_name] - def _on_engine_update( + async def _on_engine_update_async( self, engine_name: str, engine_ip: Optional[str], @@ -675,7 +796,7 @@ def _on_engine_update( if not model_names: return - self._add_engine(engine_name, engine_ip, model_names, model_label) + await self._add_engine_async(engine_name, engine_ip, model_names, model_label) elif event == "DELETED": if engine_name not in self.available_engines: @@ -688,7 +809,7 @@ def _on_engine_update( return if is_pod_ready and model_names: - self._add_engine(engine_name, engine_ip, model_names, model_label) + await self._add_engine_async(engine_name, engine_ip, model_names, model_label) return if ( @@ -723,6 +844,11 @@ def close(self): """ self.running = False self.k8s_watcher.stop() + + # Cancel the event processor task + if self.event_processor_task: + self.event_processor_task.cancel() + self.watcher_thread.join() async def initialize_client_sessions(self) -> None: @@ -730,6 +856,9 @@ async def initialize_client_sessions(self) -> None: Initialize aiohttp ClientSession objects for prefill and decode endpoints. This must be called from an async context during app startup. """ + # Start the event processor + await self._start_event_processor() + if ( self.prefill_model_labels is not None and self.decode_model_labels is not None From d31030470f6c88799a6daff1aaed13bc00a771d6 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 15:56:17 +0200 Subject: [PATCH 06/19] remove useless not ressource version Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 4eee7882b..7f566af81 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -397,7 +397,6 @@ def __init__( self.prefill_model_labels = prefill_model_labels self.decode_model_labels = decode_model_labels self.failing_counter = 0 # TODO: remove when implementation works - self.use_ressource_version = False # TODO: remove when implementation works @staticmethod def _check_pod_ready(container_statuses): @@ -601,16 +600,14 @@ def _watch_engines(self): while self.running: try: logger.info(f"K8s watcher started{self.get_endpoint_info()}") - logger.info("time out is 30") - logger.info("Jon latest version v2") - + # Use resource version for efficient watching watch_params = { "namespace": self.namespace, "label_selector": self.label_selector, "timeout_seconds": 30, } - if self.resource_version and self.use_ressource_version: + if self.resource_version: watch_params["resource_version"] = self.resource_version From 5b75522cd3b4707db7832638de280299c926fb13 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:03:56 +0200 Subject: [PATCH 07/19] change logging Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 7f566af81..699f4be9e 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -599,8 +599,7 @@ def _watch_engines(self): """ while self.running: try: - logger.info(f"K8s watcher started{self.get_endpoint_info()}") - + logger.debug(f"K8s watcher started{self.get_endpoint_info()}") # Use resource version for efficient watching watch_params = { "namespace": self.namespace, @@ -611,12 +610,10 @@ def _watch_engines(self): watch_params["resource_version"] = self.resource_version - logger.debug(f"{watch_params=}") for event in self.k8s_watcher.stream( self.k8s_api.list_namespaced_pod, **watch_params ): - logger.debug(f"Watching pod {event["object"].metadata.name} as event type: {event['type']}") # Update resource version self.resource_version = event["object"].metadata.resource_version @@ -687,7 +684,7 @@ async def _process_single_event(self, event: dict): pod_name = pod.metadata.name pod_ip = pod.status.pod_ip - logger.info(f"pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") + logger.info(f"Processing event: pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") # Preprocess the event to get all necessary information preprocessed_data = await self._preprocess_event(pod, pod_ip) @@ -704,8 +701,7 @@ async def _process_single_event(self, event: dict): async def _preprocess_event(self, pod, pod_ip: str) -> dict: """ - Preprocess event data to extract all necessary information. - This method contains the logic that was previously inline in _watch_engines. + Preprocess event data to extract information about model names/labels/pod readiness """ # Check if pod is terminating is_pod_terminating = self._is_pod_terminating(pod) From 203f30296a8304b79a3854adb80e39eb10300556 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:04:37 +0200 Subject: [PATCH 08/19] remove failing counter Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 699f4be9e..71a796110 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -396,7 +396,6 @@ def __init__( self.watcher_thread.start() self.prefill_model_labels = prefill_model_labels self.decode_model_labels = decode_model_labels - self.failing_counter = 0 # TODO: remove when implementation works @staticmethod def _check_pod_ready(container_statuses): @@ -517,15 +516,9 @@ async def _get_model_names_async(self, pod_ip) -> List[str]: url = f"http://{pod_ip}:{self.port}/v1/models" try: headers = None - self.failing_counter += 1 - logger.info(f"{self.failing_counter=}") if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): - if self.failing_counter > 3: - await asyncio.sleep(60) - VLLM_API_KEY = "wrong_key_jklkjlkj" logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - # Use aiohttp for async HTTP requests async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: From 2675c5cea40288adab8558302a4da39c43c4426c Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:10:02 +0200 Subject: [PATCH 09/19] fix log to debug Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 71a796110..ae3eb0a00 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -677,7 +677,7 @@ async def _process_single_event(self, event: dict): pod_name = pod.metadata.name pod_ip = pod.status.pod_ip - logger.info(f"Processing event: pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") + logger.debug(f"Processing event: pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") # Preprocess the event to get all necessary information preprocessed_data = await self._preprocess_event(pod, pod_ip) From c4296f5dbc31d99e3134f52bd25d75b5c57241d3 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:21:01 +0200 Subject: [PATCH 10/19] remove useless log Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index ae3eb0a00..1e6941999 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -701,8 +701,6 @@ async def _preprocess_event(self, pod, pod_ip: str) -> dict: is_container_ready = self._check_pod_ready( pod.status.container_statuses ) - - logger.info(f"{is_container_ready=}") # Pod is ready if container is ready and pod is not terminating is_pod_ready = is_container_ready and not is_pod_terminating From 8fd62a5844a337171d50caa991eb757c3763515c Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:25:06 +0200 Subject: [PATCH 11/19] black Signed-off-by: jonoillar --- .../test_k8s_pod_ip_service_discovery.py | 158 +++++++++++------- src/vllm_router/service_discovery.py | 46 ++--- 2 files changed, 125 insertions(+), 79 deletions(-) diff --git a/src/tests/test_k8s_pod_ip_service_discovery.py b/src/tests/test_k8s_pod_ip_service_discovery.py index 132b37959..e3fb2814d 100644 --- a/src/tests/test_k8s_pod_ip_service_discovery.py +++ b/src/tests/test_k8s_pod_ip_service_discovery.py @@ -16,10 +16,12 @@ def mock_app(): @pytest.fixture def mock_k8s_dependencies(): """Mock all Kubernetes dependencies.""" - with patch('vllm_router.service_discovery.config') as mock_config, \ - patch('vllm_router.service_discovery.client.CoreV1Api') as mock_api_class, \ - patch('vllm_router.service_discovery.watch.Watch') as mock_watch_class, \ - patch('vllm_router.service_discovery.requests') as mock_requests: + with ( + patch("vllm_router.service_discovery.config") as mock_config, + patch("vllm_router.service_discovery.client.CoreV1Api") as mock_api_class, + patch("vllm_router.service_discovery.watch.Watch") as mock_watch_class, + patch("vllm_router.service_discovery.requests") as mock_requests, + ): # Mock config loading mock_config.load_incluster_config.return_value = None @@ -38,14 +40,21 @@ def mock_k8s_dependencies(): mock_requests.get.return_value = mock_response yield { - 'config': mock_config, - 'api': mock_api, - 'watcher': mock_watcher, - 'requests': mock_requests + "config": mock_config, + "api": mock_api, + "watcher": mock_watcher, + "requests": mock_requests, } -def create_mock_pod_event(event_type, pod_name, pod_ip, ready=True, terminating=False, model_label="test-model"): +def create_mock_pod_event( + event_type, + pod_name, + pod_ip, + ready=True, + terminating=False, + model_label="test-model", +): """Helper method to create a mock Kubernetes pod event.""" pod = MagicMock() pod.metadata.name = pod_name @@ -58,33 +67,37 @@ def create_mock_pod_event(event_type, pod_name, pod_ip, ready=True, terminating= pod.status.pod_ip = pod_ip pod.status.container_statuses = [MagicMock(ready=ready)] if ready else [] - return { - "type": event_type, - "object": pod - } + return {"type": event_type, "object": pod} def test_scenario_1_two_pods_present(mock_app, mock_k8s_dependencies): """Test scenario 1: 2 model pods present and running.""" + # Create a generator that yields events and then stops def mock_stream_generator(): - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) # Stop after yielding the events raise Exception("Simulated timeout") - mock_k8s_dependencies['watcher'].stream.return_value = mock_stream_generator() + mock_k8s_dependencies["watcher"].stream.return_value = mock_stream_generator() # Mock sleep mode check to return False - with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + with patch.object( + K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False + ): discovery = K8sPodIPServiceDiscovery( - app=mock_app, - namespace="test-namespace", - port="8000" + app=mock_app, namespace="test-namespace", port="8000" ) # Give the watcher thread time to process the events - time.sleep(0.1) # hardcoded 0.1 so that while the watcher sleeps after failing, this sleeping is exhausted + time.sleep( + 0.1 + ) # hardcoded 0.1 so that while the watcher sleeps after failing, this sleeping is exhausted # Check that both engines are in available_engines assert len(discovery.available_engines) == 2 @@ -109,23 +122,30 @@ def mock_stream_generator(): def test_scenario_2_pod_deletion(mock_app, mock_k8s_dependencies): """Test scenario 2: 2 pods present, then 1 gets deleted.""" + # Mock the watcher stream to return 2 ADDED events followed by 1 DELETED event # Create a generator that yields events and then stops def mock_stream_generator(): - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") - yield create_mock_pod_event("DELETED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) + yield create_mock_pod_event( + "DELETED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) # Stop after yielding the events raise Exception("Simulated timeout") - mock_k8s_dependencies['watcher'].stream.return_value = mock_stream_generator() + mock_k8s_dependencies["watcher"].stream.return_value = mock_stream_generator() # Mock sleep mode check to return False - with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + with patch.object( + K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False + ): discovery = K8sPodIPServiceDiscovery( - app=mock_app, - namespace="test-namespace", - port="8000" + app=mock_app, namespace="test-namespace", port="8000" ) # Give the watcher thread time to process all events @@ -152,45 +172,55 @@ def test_scenario_3_pod_addition_after_timeout(mock_app, mock_k8s_dependencies, # Create a generator that yields different events on each iteration def mock_stream_generator(): # First iteration: 2 pods - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) # Simulate timeout by raising StopIteration raise StopIteration() def mock_stream_generator_second(): # Second iteration: 3 pods (including the new one) - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") - yield create_mock_pod_event("ADDED", "engine_3", "10.0.0.3", ready=True, model_label="model-3") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) + yield create_mock_pod_event( + "ADDED", "engine_3", "10.0.0.3", ready=True, model_label="model-3" + ) # Simulate timeout raise StopIteration() # Mock the watcher stream to use our generator - mock_k8s_dependencies['watcher'].stream.side_effect = [ + mock_k8s_dependencies["watcher"].stream.side_effect = [ mock_stream_generator(), - mock_stream_generator_second() + mock_stream_generator_second(), ] # Mock sleep mode check to return False - with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + with patch.object( + K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False + ): discovery = K8sPodIPServiceDiscovery( - app=mock_app, - namespace="test-namespace", - port="8000" + app=mock_app, namespace="test-namespace", port="8000" ) # Give the watcher thread time to process the first iteration time.sleep(0.5) - discovery.running = False # Stop the while loop + discovery.running = False # Stop the while loop # Check that both engines are in available_engines after first iteration assert len(discovery.available_engines) == 2 assert "engine_1" in discovery.available_engines assert "engine_2" in discovery.available_engines assert "engine_3" not in discovery.available_engines - discovery.running = True # Restart the while loop + discovery.running = True # Restart the while loop # Give more time for the second iteration to process time.sleep(0.5) @@ -220,7 +250,9 @@ def test_scenario_4_slow_models_call_blocks_deletion(mock_app, mock_k8s_dependen def mock_slow_requests_get(url, headers=None): if should_call_false: # Third call to engine_1's /v1/models - simulate slow response - time.sleep(40) # Simulate a slow call that would exceed timeout in real scenario + time.sleep( + 40 + ) # Simulate a slow call that would exceed timeout in real scenario raise Exception("Simulated slow response") else: # Normal fast response for other calls @@ -232,38 +264,50 @@ def mock_slow_requests_get(url, headers=None): # Create generators for each watch iteration def mock_stream_generator_first(): # First iteration: 2 pods added - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) raise Exception("Simulated timeout") def mock_stream_generator_second(): # Second iteration: same 2 pods added again (no change) - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) raise Exception("Simulated timeout") def mock_stream_generator_third(): # Third iteration: engine_1 slow call, engine_2 deleted - yield create_mock_pod_event("ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1") - yield create_mock_pod_event("DELETED", "engine_2", "10.0.0.2", ready=True, model_label="model-2") + yield create_mock_pod_event( + "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" + ) + yield create_mock_pod_event( + "DELETED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" + ) raise Exception("Simulated timeout") # Mock the watcher stream to use our generators - mock_k8s_dependencies['watcher'].stream.side_effect = [ + mock_k8s_dependencies["watcher"].stream.side_effect = [ mock_stream_generator_first(), mock_stream_generator_second(), - mock_stream_generator_third() + mock_stream_generator_third(), ] # Mock the requests.get to simulate slow response - mock_k8s_dependencies['requests'].get.side_effect = mock_slow_requests_get + mock_k8s_dependencies["requests"].get.side_effect = mock_slow_requests_get # Mock sleep mode check to return False - with patch.object(K8sPodIPServiceDiscovery, '_check_engine_sleep_mode', return_value=False): + with patch.object( + K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False + ): discovery = K8sPodIPServiceDiscovery( - app=mock_app, - namespace="test-namespace", - port="8000" + app=mock_app, namespace="test-namespace", port="8000" ) # First iteration: Give time for both engines to be added @@ -308,4 +352,4 @@ def mock_stream_generator_third(): assert engine_1.url == "http://10.0.0.1:8000" assert engine_2.url == "http://10.0.0.2:8000" - discovery.close() \ No newline at end of file + discovery.close() diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 1e6941999..fc93e22da 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -433,7 +433,7 @@ async def _get_engine_sleep_status_async(self, pod_ip) -> Optional[bool]: if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - + # Use aiohttp for async HTTP requests async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: @@ -554,7 +554,7 @@ async def _get_model_info_async(self, pod_ip) -> Dict[str, ModelInfo]: if VLLM_API_KEY := os.getenv("VLLM_API_KEY"): logger.info("Using vllm server authentication") headers = {"Authorization": f"Bearer {VLLM_API_KEY}"} - + # Use aiohttp for async HTTP requests async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers) as response: @@ -602,17 +602,15 @@ def _watch_engines(self): if self.resource_version: watch_params["resource_version"] = self.resource_version - for event in self.k8s_watcher.stream( - self.k8s_api.list_namespaced_pod, - **watch_params + self.k8s_api.list_namespaced_pod, **watch_params ): # Update resource version self.resource_version = event["object"].metadata.resource_version # Enqueue event by key (pod_name) to avoid duplicates self._enqueue_event(event) - + except Exception as e: logger.error(f"K8s watcher error: {e}") time.sleep(0.5) @@ -624,12 +622,12 @@ def _enqueue_event(self, event: dict): """ pod_name = event["object"].metadata.name # Create a key-based queue using OrderedDict - if not hasattr(self, '_event_queue_dict'): + if not hasattr(self, "_event_queue_dict"): self._event_queue_dict = OrderedDict() - + # Add/update event in the ordered dict self._event_queue_dict[pod_name] = event - + # Put the pod name in the queue for processing try: self.event_queue.put_nowait(pod_name) @@ -653,15 +651,15 @@ async def _process_events(self): pod_name = await asyncio.get_event_loop().run_in_executor( None, self.event_queue.get, True, 1.0 ) - + # Get the event from our ordered dict event = self._event_queue_dict.pop(pod_name, None) if event is None: continue - + # Process the event asynchronously await self._process_single_event(event) - + except queue.Empty: continue except Exception as e: @@ -677,11 +675,13 @@ async def _process_single_event(self, event: dict): pod_name = pod.metadata.name pod_ip = pod.status.pod_ip - logger.debug(f"Processing event: pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}") + logger.debug( + f"Processing event: pod_name: {pod_name} pod_ip: {pod_ip} event_type: {event_type}" + ) # Preprocess the event to get all necessary information preprocessed_data = await self._preprocess_event(pod, pod_ip) - + # Call the async engine update handler await self._on_engine_update_async( pod_name, @@ -698,9 +698,7 @@ async def _preprocess_event(self, pod, pod_ip: str) -> dict: """ # Check if pod is terminating is_pod_terminating = self._is_pod_terminating(pod) - is_container_ready = self._check_pod_ready( - pod.status.container_statuses - ) + is_container_ready = self._check_pod_ready(pod.status.container_statuses) # Pod is ready if container is ready and pod is not terminating is_pod_ready = is_container_ready and not is_pod_terminating @@ -780,7 +778,9 @@ async def _on_engine_update_async( if not model_names: return - await self._add_engine_async(engine_name, engine_ip, model_names, model_label) + await self._add_engine_async( + engine_name, engine_ip, model_names, model_label + ) elif event == "DELETED": if engine_name not in self.available_engines: @@ -793,7 +793,9 @@ async def _on_engine_update_async( return if is_pod_ready and model_names: - await self._add_engine_async(engine_name, engine_ip, model_names, model_label) + await self._add_engine_async( + engine_name, engine_ip, model_names, model_label + ) return if ( @@ -828,11 +830,11 @@ def close(self): """ self.running = False self.k8s_watcher.stop() - + # Cancel the event processor task if self.event_processor_task: self.event_processor_task.cancel() - + self.watcher_thread.join() async def initialize_client_sessions(self) -> None: @@ -842,7 +844,7 @@ async def initialize_client_sessions(self) -> None: """ # Start the event processor await self._start_event_processor() - + if ( self.prefill_model_labels is not None and self.decode_model_labels is not None From da3379a511eb001f1bbe821351b9c3cdfe7a64a1 Mon Sep 17 00:00:00 2001 From: Jon OILLARBURU Date: Tue, 26 Aug 2025 16:33:11 +0200 Subject: [PATCH 12/19] test Signed-off-by: jonoillar From c83fcba84e5acf644ba65b88e8614efafac6aaec Mon Sep 17 00:00:00 2001 From: jonoillar Date: Tue, 26 Aug 2025 16:34:57 +0200 Subject: [PATCH 13/19] test 2 Signed-off-by: jonoillar From e3a916e23c4c5d6ccf33a3a28932c263a648a2b1 Mon Sep 17 00:00:00 2001 From: jonoillar Date: Tue, 26 Aug 2025 16:50:10 +0200 Subject: [PATCH 14/19] empty commit Signed-off-by: jonoillar From f50b921d09c91af1cc76fc09493062dc54eb2bdc Mon Sep 17 00:00:00 2001 From: jonoillar Date: Wed, 27 Aug 2025 10:12:58 +0200 Subject: [PATCH 15/19] add thread locking Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index fc93e22da..cb35715cd 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -387,6 +387,8 @@ def __init__( # Event queue and processor self.event_queue = queue.Queue() + self._event_queue_dict = OrderedDict() + self._event_queue_dict_lock = threading.Lock() self.event_processor_task = None self.resource_version = None @@ -621,12 +623,10 @@ def _enqueue_event(self, event: dict): replace the older event. """ pod_name = event["object"].metadata.name - # Create a key-based queue using OrderedDict - if not hasattr(self, "_event_queue_dict"): - self._event_queue_dict = OrderedDict() # Add/update event in the ordered dict - self._event_queue_dict[pod_name] = event + with self._event_queue_dict_lock: + self._event_queue_dict[pod_name] = event # Put the pod name in the queue for processing try: @@ -653,7 +653,8 @@ async def _process_events(self): ) # Get the event from our ordered dict - event = self._event_queue_dict.pop(pod_name, None) + with self._event_queue_dict_lock: + event = self._event_queue_dict.pop(pod_name, None) if event is None: continue From 862a26844def7e4e95c6b8f517f97710d6bd87fb Mon Sep 17 00:00:00 2001 From: jonoillar Date: Wed, 27 Aug 2025 10:15:51 +0200 Subject: [PATCH 16/19] rename methods Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index cb35715cd..14dc568a9 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -504,7 +504,7 @@ def remove_sleep_label(self, pod_name): except client.rest.ApiException as e: logger.error(f"Error removing sleeping label: {e}") - async def _get_model_names_async(self, pod_ip) -> List[str]: + async def _get_model_names(self, pod_ip) -> List[str]: """ Get the model names of the serving engine pod by querying the pod's '/v1/models' endpoint. @@ -540,7 +540,7 @@ async def _get_model_names_async(self, pod_ip) -> List[str]: logger.error(f"Failed to get model names from {url}: {e}") return [] - async def _get_model_info_async(self, pod_ip) -> Dict[str, ModelInfo]: + async def _get_model_info(self, pod_ip) -> Dict[str, ModelInfo]: """ Get detailed model information from the serving engine pod. @@ -704,7 +704,7 @@ async def _preprocess_event(self, pod, pod_ip: str) -> dict: is_pod_ready = is_container_ready and not is_pod_terminating if is_pod_ready: - model_names = await self._get_model_names_async(pod_ip) + model_names = await self._get_model_names(pod_ip) model_label = self._get_model_label(pod) else: model_names = [] @@ -731,7 +731,7 @@ async def _add_engine_async( ) # Get detailed model information - model_info = await self._get_model_info_async(engine_ip) + model_info = await self._get_model_info(engine_ip) # Check if engine is enabled with sleep mode and set engine sleep status if self._check_engine_sleep_mode(engine_name): From b5bad7c2f773da465ce369663ef24f89dc138446 Mon Sep 17 00:00:00 2001 From: jonoillar Date: Wed, 27 Aug 2025 10:34:06 +0200 Subject: [PATCH 17/19] remove useless test Signed-off-by: jonoillar --- .../test_k8s_pod_ip_service_discovery.py | 355 ------------------ 1 file changed, 355 deletions(-) delete mode 100644 src/tests/test_k8s_pod_ip_service_discovery.py diff --git a/src/tests/test_k8s_pod_ip_service_discovery.py b/src/tests/test_k8s_pod_ip_service_discovery.py deleted file mode 100644 index e3fb2814d..000000000 --- a/src/tests/test_k8s_pod_ip_service_discovery.py +++ /dev/null @@ -1,355 +0,0 @@ -from unittest.mock import MagicMock, patch -import pytest -import time - -from vllm_router.service_discovery import K8sPodIPServiceDiscovery, EndpointInfo - - -@pytest.fixture -def mock_app(): - """Mock FastAPI app instance.""" - app = MagicMock() - app.state = MagicMock() - return app - - -@pytest.fixture -def mock_k8s_dependencies(): - """Mock all Kubernetes dependencies.""" - with ( - patch("vllm_router.service_discovery.config") as mock_config, - patch("vllm_router.service_discovery.client.CoreV1Api") as mock_api_class, - patch("vllm_router.service_discovery.watch.Watch") as mock_watch_class, - patch("vllm_router.service_discovery.requests") as mock_requests, - ): - # Mock config loading - mock_config.load_incluster_config.return_value = None - - # Mock API client - mock_api = MagicMock() - mock_api_class.return_value = mock_api - - # Mock watcher - mock_watcher = MagicMock() - mock_watch_class.return_value = mock_watcher - - # Mock HTTP responses - mock_response = MagicMock() - mock_response.json.return_value = {"data": [{"id": "test-model"}]} - mock_response.raise_for_status.return_value = None - mock_requests.get.return_value = mock_response - - yield { - "config": mock_config, - "api": mock_api, - "watcher": mock_watcher, - "requests": mock_requests, - } - - -def create_mock_pod_event( - event_type, - pod_name, - pod_ip, - ready=True, - terminating=False, - model_label="test-model", -): - """Helper method to create a mock Kubernetes pod event.""" - pod = MagicMock() - pod.metadata.name = pod_name - pod.metadata.labels = {"model": model_label} if model_label else {} - if terminating: - pod.metadata.deletion_timestamp = "2024-01-01T00:00:00Z" - else: - pod.metadata.deletion_timestamp = None - - pod.status.pod_ip = pod_ip - pod.status.container_statuses = [MagicMock(ready=ready)] if ready else [] - - return {"type": event_type, "object": pod} - - -def test_scenario_1_two_pods_present(mock_app, mock_k8s_dependencies): - """Test scenario 1: 2 model pods present and running.""" - - # Create a generator that yields events and then stops - def mock_stream_generator(): - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - # Stop after yielding the events - raise Exception("Simulated timeout") - - mock_k8s_dependencies["watcher"].stream.return_value = mock_stream_generator() - - # Mock sleep mode check to return False - with patch.object( - K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False - ): - discovery = K8sPodIPServiceDiscovery( - app=mock_app, namespace="test-namespace", port="8000" - ) - - # Give the watcher thread time to process the events - time.sleep( - 0.1 - ) # hardcoded 0.1 so that while the watcher sleeps after failing, this sleeping is exhausted - - # Check that both engines are in available_engines - assert len(discovery.available_engines) == 2 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines - - # Verify the endpoint info - engine_1 = discovery.available_engines["engine_1"] - engine_2 = discovery.available_engines["engine_2"] - - assert isinstance(engine_1, EndpointInfo) - assert isinstance(engine_2, EndpointInfo) - assert engine_1.url == "http://10.0.0.1:8000" - assert engine_2.url == "http://10.0.0.2:8000" - assert engine_1.model_names == ["test-model"] - assert engine_2.model_names == ["test-model"] - assert engine_1.model_label == "model-1" - assert engine_2.model_label == "model-2" - - discovery.close() - - -def test_scenario_2_pod_deletion(mock_app, mock_k8s_dependencies): - """Test scenario 2: 2 pods present, then 1 gets deleted.""" - - # Mock the watcher stream to return 2 ADDED events followed by 1 DELETED event - # Create a generator that yields events and then stops - def mock_stream_generator(): - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - yield create_mock_pod_event( - "DELETED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - # Stop after yielding the events - raise Exception("Simulated timeout") - - mock_k8s_dependencies["watcher"].stream.return_value = mock_stream_generator() - - # Mock sleep mode check to return False - with patch.object( - K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False - ): - discovery = K8sPodIPServiceDiscovery( - app=mock_app, namespace="test-namespace", port="8000" - ) - - # Give the watcher thread time to process all events - time.sleep(0.3) - - # Check that only engine_2 remains in available_engines - assert len(discovery.available_engines) == 1 - assert "engine_1" not in discovery.available_engines - assert "engine_2" in discovery.available_engines - - # Verify the remaining endpoint info - engine_2 = discovery.available_engines["engine_2"] - assert isinstance(engine_2, EndpointInfo) - assert engine_2.url == "http://10.0.0.2:8000" - assert engine_2.model_names == ["test-model"] - assert engine_2.model_label == "model-2" - - discovery.close() - - -def test_scenario_3_pod_addition_after_timeout(mock_app, mock_k8s_dependencies, caplog): - """Test scenario 3: 2 pods present, then 1 more added after timeout.""" - - # Create a generator that yields different events on each iteration - def mock_stream_generator(): - # First iteration: 2 pods - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - - # Simulate timeout by raising StopIteration - raise StopIteration() - - def mock_stream_generator_second(): - # Second iteration: 3 pods (including the new one) - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - yield create_mock_pod_event( - "ADDED", "engine_3", "10.0.0.3", ready=True, model_label="model-3" - ) - - # Simulate timeout - raise StopIteration() - - # Mock the watcher stream to use our generator - mock_k8s_dependencies["watcher"].stream.side_effect = [ - mock_stream_generator(), - mock_stream_generator_second(), - ] - - # Mock sleep mode check to return False - with patch.object( - K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False - ): - discovery = K8sPodIPServiceDiscovery( - app=mock_app, namespace="test-namespace", port="8000" - ) - - # Give the watcher thread time to process the first iteration - time.sleep(0.5) - discovery.running = False # Stop the while loop - - # Check that both engines are in available_engines after first iteration - assert len(discovery.available_engines) == 2 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines - assert "engine_3" not in discovery.available_engines - discovery.running = True # Restart the while loop - - # Give more time for the second iteration to process - time.sleep(0.5) - - # Check that all three engines are in available_engines after second iteration - assert len(discovery.available_engines) == 3 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines - assert "engine_3" in discovery.available_engines - - # Verify the new endpoint info - engine_3 = discovery.available_engines["engine_3"] - assert isinstance(engine_3, EndpointInfo) - assert engine_3.url == "http://10.0.0.3:8000" - assert engine_3.model_names == ["test-model"] - assert engine_3.model_label == "model-3" - - discovery.close() - - -def test_scenario_4_slow_models_call_blocks_deletion(mock_app, mock_k8s_dependencies): - """Test scenario 4: Slow /v1/models call blocks deletion event processing.""" - - # Track how many times we've been called to simulate different behaviors - should_call_false = False - - def mock_slow_requests_get(url, headers=None): - if should_call_false: - # Third call to engine_1's /v1/models - simulate slow response - time.sleep( - 40 - ) # Simulate a slow call that would exceed timeout in real scenario - raise Exception("Simulated slow response") - else: - # Normal fast response for other calls - mock_response = MagicMock() - mock_response.json.return_value = {"data": [{"id": "test-model"}]} - mock_response.raise_for_status.return_value = None - return mock_response - - # Create generators for each watch iteration - def mock_stream_generator_first(): - # First iteration: 2 pods added - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - raise Exception("Simulated timeout") - - def mock_stream_generator_second(): - # Second iteration: same 2 pods added again (no change) - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "ADDED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - raise Exception("Simulated timeout") - - def mock_stream_generator_third(): - # Third iteration: engine_1 slow call, engine_2 deleted - yield create_mock_pod_event( - "ADDED", "engine_1", "10.0.0.1", ready=True, model_label="model-1" - ) - yield create_mock_pod_event( - "DELETED", "engine_2", "10.0.0.2", ready=True, model_label="model-2" - ) - raise Exception("Simulated timeout") - - # Mock the watcher stream to use our generators - mock_k8s_dependencies["watcher"].stream.side_effect = [ - mock_stream_generator_first(), - mock_stream_generator_second(), - mock_stream_generator_third(), - ] - - # Mock the requests.get to simulate slow response - mock_k8s_dependencies["requests"].get.side_effect = mock_slow_requests_get - - # Mock sleep mode check to return False - with patch.object( - K8sPodIPServiceDiscovery, "_check_engine_sleep_mode", return_value=False - ): - discovery = K8sPodIPServiceDiscovery( - app=mock_app, namespace="test-namespace", port="8000" - ) - - # First iteration: Give time for both engines to be added - time.sleep(0.5) - discovery.running = False - - # Check that both engines are in available_engines after first iteration - assert len(discovery.available_engines) == 2 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines - discovery.running = True - - # Second iteration: Give time for the second iteration (should be no change) - time.sleep(0.5) - discovery.running = False - - # Check that both engines are still in available_engines after second iteration - assert len(discovery.available_engines) == 2 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines - should_call_false = True - discovery.running = True - - # Third iteration: Give time for the third iteration - # The slow call to engine_1's /v1/models should block processing - # and prevent the DELETED event for engine_2 from being processed - time.sleep(0.5) - discovery.running = False - - # Check that engine_2 is still in available_engines because the DELETED event - # was not processed due to the slow /v1/models call blocking the stream - assert len(discovery.available_engines) == 2 - assert "engine_1" in discovery.available_engines - assert "engine_2" in discovery.available_engines # Should still be here! - - # Verify that engine_1 is still there (even though the /v1/models call was slow) - engine_1 = discovery.available_engines["engine_1"] - engine_2 = discovery.available_engines["engine_2"] - - assert isinstance(engine_1, EndpointInfo) - assert isinstance(engine_2, EndpointInfo) - assert engine_1.url == "http://10.0.0.1:8000" - assert engine_2.url == "http://10.0.0.2:8000" - - discovery.close() From 9bbfff6a4bb4459570fc415a6af2c166f7e08d99 Mon Sep 17 00:00:00 2001 From: jonoillar Date: Wed, 27 Aug 2025 12:11:24 +0200 Subject: [PATCH 18/19] rename methods Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 14dc568a9..1c940e81c 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -418,7 +418,7 @@ def _is_pod_terminating(pod): """ return pod.metadata.deletion_timestamp is not None - async def _get_engine_sleep_status_async(self, pod_ip) -> Optional[bool]: + async def _get_engine_sleep_status(self, pod_ip) -> Optional[bool]: """ Get the engine sleeping status by querying the engine's '/is_sleeping' endpoint. @@ -684,7 +684,7 @@ async def _process_single_event(self, event: dict): preprocessed_data = await self._preprocess_event(pod, pod_ip) # Call the async engine update handler - await self._on_engine_update_async( + await self._on_engine_update( pod_name, pod_ip, event_type, @@ -722,7 +722,7 @@ async def _preprocess_event(self, pod, pod_ip: str) -> dict: "model_label": model_label, } - async def _add_engine_async( + async def _add_engine( self, engine_name: str, engine_ip: str, model_names: List[str], model_label: str ): logger.info( @@ -735,7 +735,7 @@ async def _add_engine_async( # Check if engine is enabled with sleep mode and set engine sleep status if self._check_engine_sleep_mode(engine_name): - sleep_status = await self._get_engine_sleep_status_async(engine_ip) + sleep_status = await self._get_engine_sleep_status(engine_ip) else: sleep_status = False @@ -760,7 +760,7 @@ def _delete_engine(self, engine_name: str): with self.available_engines_lock: del self.available_engines[engine_name] - async def _on_engine_update_async( + async def _on_engine_update( self, engine_name: str, engine_ip: Optional[str], @@ -779,7 +779,7 @@ async def _on_engine_update_async( if not model_names: return - await self._add_engine_async( + await self._add_engine( engine_name, engine_ip, model_names, model_label ) @@ -794,7 +794,7 @@ async def _on_engine_update_async( return if is_pod_ready and model_names: - await self._add_engine_async( + await self._add_engine( engine_name, engine_ip, model_names, model_label ) return From 6fef26fb166806f93082d2522afb70846405b4eb Mon Sep 17 00:00:00 2001 From: jonoillar Date: Mon, 1 Sep 2025 20:51:17 +0200 Subject: [PATCH 19/19] black change Signed-off-by: jonoillar --- src/vllm_router/service_discovery.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/vllm_router/service_discovery.py b/src/vllm_router/service_discovery.py index 1c940e81c..50d64c6b7 100644 --- a/src/vllm_router/service_discovery.py +++ b/src/vllm_router/service_discovery.py @@ -16,17 +16,17 @@ import enum import hashlib import os +import queue import threading import time import uuid +from collections import OrderedDict from dataclasses import dataclass from typing import Dict, List, Optional import aiohttp import requests -import queue from kubernetes import client, config, watch -from collections import OrderedDict from vllm_router import utils from vllm_router.log import init_logger @@ -779,9 +779,7 @@ async def _on_engine_update( if not model_names: return - await self._add_engine( - engine_name, engine_ip, model_names, model_label - ) + await self._add_engine(engine_name, engine_ip, model_names, model_label) elif event == "DELETED": if engine_name not in self.available_engines: @@ -794,9 +792,7 @@ async def _on_engine_update( return if is_pod_ready and model_names: - await self._add_engine( - engine_name, engine_ip, model_names, model_label - ) + await self._add_engine(engine_name, engine_ip, model_names, model_label) return if (