diff --git a/tests/router/test_router_e2e_with_mockers.py b/tests/router/test_router_e2e_with_mockers.py index b8d58e4cf2..34218161d7 100644 --- a/tests/router/test_router_e2e_with_mockers.py +++ b/tests/router/test_router_e2e_with_mockers.py @@ -277,6 +277,97 @@ async def send_single_request(session: aiohttp.ClientSession, request_id: int): logger.info(f"All {num_requests} requests completed successfully") +async def send_request_via_python_kv_router( + kv_python_router: KvPushRouter, + token_ids: list, + initial_wait: float, + max_retries: int, + stop_conditions: Optional[dict] = None, + sampling_options: Optional[dict] = None, + output_options: Optional[dict] = None, + router_config_override: Optional[dict] = None, + worker_id: Optional[ + int + ] = None, # If None, Router will select the best available worker +): + """Send a request to the specified mocker instance. + Returns True if mockers respond, otherwise raises or returns False. + """ + + wait_time = initial_wait + + log_message = ( + f"the mocker with worker_id={worker_id}" + if worker_id is not None + else "the best available mocker" + ) + + # Retry loop sending reuqest to mocker worker with exponential backoff + for attempt in range(max_retries + 1): + try: + logger.info(f"Sending request to {log_message} (attempt {attempt + 1})") + + stream = await kv_python_router.generate( + token_ids=token_ids, + model=MODEL_NAME, + stop_conditions=stop_conditions, + sampling_options=sampling_options, + output_options=output_options, + router_config_override=router_config_override, + # worker_id=worker_id, + ) + + if stream is not None: + logger.info(f"Request succeeded on attempt {attempt + 1}") + break + + except Exception as e: + logger.warning(f"Attempt {attempt + 1} failed with error: {e}") + if attempt < max_retries: + await asyncio.sleep(wait_time) + wait_time *= 2 + else: + raise RuntimeError( + f"Failed to connect to mockers after {max_retries + 1} attempts: {e}" + ) + + # Collect tokens from the SSE stream + generated_tokens = [] + async for response in stream: + if isinstance(response, dict): + # Check if response has token_ids + if "token_ids" in response: + tokens = response["token_ids"] + if isinstance(tokens, list): + generated_tokens.extend(tokens) + logger.debug(f"Received {len(tokens)} tokens: {tokens}") + + # Check for finish reason + if "finish_reason" in response: + logger.info(f"Stream finished with reason: {response['finish_reason']}") + + # Verify if expected number of tokens are generated if max_tokens specified and ignore_eos is True + logger.info(f"Total generated tokens: {len(generated_tokens)}") + if ( + stop_conditions + and "max_tokens" in stop_conditions + and "ignore_eos" in stop_conditions + and stop_conditions["ignore_eos"] + ): + max_tokens = int(stop_conditions["max_tokens"]) + assert len(generated_tokens) == max_tokens, ( + f"Expected exactly {max_tokens} tokens but got {len(generated_tokens)}. " + f"Tokens: {generated_tokens}" + ) + + logger.info( + f"Successfully verified {max_tokens} tokens generated as expected via KvPushRouter with ignore_eos=True" + ) + return True + + return False + + @pytest.mark.pre_merge @pytest.mark.model(MODEL_NAME) def test_mocker_kv_router(request, runtime_services, predownload_tokenizers): @@ -288,7 +379,7 @@ def test_mocker_kv_router(request, runtime_services, predownload_tokenizers): # runtime_services starts etcd and nats logger.info("Starting mocker KV router test") - # Create mocker args dictionary + # Create mocker args dictiona: FixtureRequestry: tuple[NatsServer, EtcdServer]: NoneType mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} try: @@ -340,7 +431,7 @@ def test_mocker_two_kv_router(request, runtime_services, predownload_tokenizers) # runtime_services starts etcd and nats logger.info("Starting mocker two KV router test") - # Create mocker args dictionary + # Create mocker args dictionary: FixtureRequest: tuple[NatsServer, EtcdServer]: NoneType mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} kv_routers = [] @@ -404,7 +495,6 @@ def test_mocker_kv_router_overload_503( # runtime_services starts etcd and nats logger.info("Starting mocker KV router overload test for 503 status") - # Create mocker args dictionary with limited resources mocker_args = { "speedup_ratio": 10, @@ -584,201 +674,118 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize logger.info(f"All mockers using endpoint: {mockers.endpoint}") mockers.__enter__() - # Wait for mockers to be ready by sending a dummy request with retry - async def wait_for_mockers_ready(): - """Send a dummy request to ensure mockers are ready""" - runtime = get_runtime() - # Use the namespace from the mockers - namespace = runtime.namespace(mockers.namespace) - component = namespace.component("mocker") - endpoint = component.endpoint("generate") - - kv_router_config = KvRouterConfig() - kv_push_router = KvPushRouter( - endpoint=endpoint, - block_size=BLOCK_SIZE, - kv_router_config=kv_router_config, - ) - - # Dummy request with minimal tokens - dummy_token_ids = [1, 2, 3] # Just a few tokens for testing - max_retries = 8 - wait_time = 1 - - for attempt in range(max_retries + 1): - try: - logger.info( - f"Sending dummy request to check mocker readiness (attempt {attempt + 1})" - ) - stream = await kv_push_router.generate( - token_ids=dummy_token_ids, - model=MODEL_NAME, - stop_conditions={"max_tokens": 1}, # Generate just 1 token - sampling_options={"temperature": 0.7}, - output_options={ - "include_input_tokens": False, - "return_full_text": False, - }, - ) - - # Consume the stream to verify it works - token_count = 0 - async for response in stream: - if isinstance(response, dict) and "token_ids" in response: - token_count += len(response["token_ids"]) - - logger.info( - f"Mockers are ready! Dummy request succeeded on attempt {attempt + 1}" - ) - return True - - except Exception as e: - logger.warning(f"Attempt {attempt + 1} failed with error: {e}") - if attempt < max_retries: - await asyncio.sleep(wait_time) - wait_time *= 2 # Exponential backoff - else: - raise RuntimeError( - f"Failed to connect to mockers after {max_retries + 1} attempts" - ) - - return False - - # Wait for mockers to be ready - asyncio.run(wait_for_mockers_ready()) - - # Run the async test - async def test_kv_push_router(): - # Get runtime and create endpoint - runtime = get_runtime() - # Use the namespace from the mockers - namespace = runtime.namespace(mockers.namespace) - component = namespace.component("mocker") - endpoint = component.endpoint("generate") + # Get runtime and create endpoint + runtime = get_runtime() + # Use the namespace from the mockers + namespace = runtime.namespace(mockers.namespace) + component = namespace.component("mocker") + endpoint = component.endpoint("generate") + + # Create KvRouterConfig with default settings + kv_router_config = KvRouterConfig() + + # Create KvPushRouter Python object + kv_push_router = KvPushRouter( + endpoint=endpoint, + block_size=BLOCK_SIZE, + kv_router_config=kv_router_config, + ) - # Create KvRouterConfig with default settings - kv_router_config = KvRouterConfig() + logger.info("Created KvPushRouter Python object") - # Create KvPushRouter Python object - kv_push_router = KvPushRouter( - endpoint=endpoint, - block_size=BLOCK_SIZE, - kv_router_config=kv_router_config, + # Initialize and check the readiness of the mockers by sending dummy request + asyncio.run( + send_request_via_python_kv_router( + kv_python_router=kv_push_router, + token_ids=[1, 2, 3], + initial_wait=1.0, + max_retries=8, + stop_conditions={"max_tokens": 1}, # Generate just 1 token + sampling_options={"temperature": 0.7}, + output_options={ + "include_input_tokens": False, + "return_full_text": False, + }, ) + ) - logger.info("Created KvPushRouter Python object") - - # Generate random token IDs (100 to 200 tokens) - num_input_tokens = random.randint(100, 200) - token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)] - - logger.info(f"Generated {num_input_tokens} random token IDs") - - # Set up generation parameters - stop_conditions = { - "ignore_eos": True, # Don't stop on EOS token - "max_tokens": 20, # Generate exactly 20 tokens - } - - sampling_options = {"temperature": 0.7, "top_p": 0.9} + # Generate random token IDs (100 to 200 tokens) + num_input_tokens = random.randint(100, 200) + token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)] - output_options = {"include_input_tokens": False, "return_full_text": False} + # Set up override parameters + router_config_override = { + "overlap_score_weight": 0.5, # Override the default weight + "router_temperature": 0.5, # Override the default temperature + } - # Test with router config overrides - router_config_override = { - "overlap_score_weight": 0.5, # Override the default weight - "router_temperature": 0.5, # Override the default temperature - } + logger.info(f"Generated {num_input_tokens} random token IDs") - # Call generate method - logger.info( - "Calling generate method on KvPushRouter with router config overrides" - ) - logger.info(f"Router config overrides: {router_config_override}") - stream = await kv_push_router.generate( + # Test with full overrides + logger.info( + f"Testing with full router config overrides: {router_config_override}" + ) + asyncio.run( + send_request_via_python_kv_router( + kv_python_router=kv_push_router, token_ids=token_ids, - model=MODEL_NAME, - stop_conditions=stop_conditions, - sampling_options=sampling_options, - output_options=output_options, + initial_wait=1.0, + max_retries=8, + stop_conditions={ + "ignore_eos": True, # Don't stop on EOS token + "max_tokens": 20, # Generate exactly 20 tokens + }, + sampling_options={"temperature": 0.7, "top_p": 0.9}, + output_options={ + "include_input_tokens": False, + "return_full_text": False, + }, router_config_override=router_config_override, ) + ) - # Collect tokens from the SSE stream - generated_tokens = [] - async for response in stream: - if isinstance(response, dict): - # Check if response has token_ids - if "token_ids" in response: - tokens = response["token_ids"] - if isinstance(tokens, list): - generated_tokens.extend(tokens) - logger.debug(f"Received {len(tokens)} tokens: {tokens}") - - # Check for finish reason - if "finish_reason" in response: - logger.info( - f"Stream finished with reason: {response['finish_reason']}" - ) - - # Verify we got exactly 20 tokens - logger.info(f"Total generated tokens: {len(generated_tokens)}") - assert len(generated_tokens) == 20, ( - f"Expected exactly 20 tokens but got {len(generated_tokens)}. " - f"Tokens: {generated_tokens}" - ) - - logger.info( - "Successfully verified 20 tokens generated via KvPushRouter with overrides" - ) - - # Test again without overrides - logger.info("Testing again without router config overrides") - stream = await kv_push_router.generate( - token_ids=token_ids[:50], # Use fewer tokens for second test - model=MODEL_NAME, - stop_conditions={"max_tokens": 10}, - sampling_options=sampling_options, - output_options=output_options, + # Test without overrides + logger.info("Testing without router config overrides") + asyncio.run( + send_request_via_python_kv_router( + kv_python_router=kv_push_router, + token_ids=token_ids[:50], # Use fewer tokens for second test, + initial_wait=1.0, + max_retries=8, + stop_conditions={ + "ignore_eos": True, # Don't stop on EOS token + "max_tokens": 10, # Generate exactly 10 tokens for the second test + }, + sampling_options={"temperature": 0.7, "top_p": 0.9}, + output_options={ + "include_input_tokens": False, + "return_full_text": False, + }, # No router_config_override this time ) + ) - generated_tokens_no_override = [] - async for response in stream: - if isinstance(response, dict) and "token_ids" in response: - generated_tokens_no_override.extend(response["token_ids"]) - - assert ( - len(generated_tokens_no_override) == 10 - ), f"Expected 10 tokens but got {len(generated_tokens_no_override)}" - logger.info("Successfully verified generation without overrides") - - # Test with partial override (only temperature) - logger.info( - "Testing with partial router config override (temperature only)" - ) - partial_override = {"router_temperature": 0.1} - stream = await kv_push_router.generate( - token_ids=token_ids[:30], # Use even fewer tokens - model=MODEL_NAME, - stop_conditions={"max_tokens": 5}, - sampling_options=sampling_options, - output_options=output_options, + # Test with partial override (only temperature) + partial_override = {"router_temperature": 0.1} + logger.info(f"Testing with partial router config overrides: {partial_override}") + asyncio.run( + send_request_via_python_kv_router( + kv_python_router=kv_push_router, + token_ids=token_ids[:30], # Use fewer tokens for third test, + initial_wait=1.0, + max_retries=8, + stop_conditions={ + "ignore_eos": True, # Don't stop on EOS token + "max_tokens": 5, # Generate exactly 5 tokens for the third test + }, + sampling_options={"temperature": 0.7, "top_p": 0.9}, + output_options={ + "include_input_tokens": False, + "return_full_text": False, + }, router_config_override=partial_override, ) - - generated_tokens_partial = [] - async for response in stream: - if isinstance(response, dict) and "token_ids" in response: - generated_tokens_partial.extend(response["token_ids"]) - - assert ( - len(generated_tokens_partial) == 5 - ), f"Expected 5 tokens but got {len(generated_tokens_partial)}" - logger.info("Successfully verified generation with partial override") - - # Run the async test - asyncio.run(test_kv_push_router()) + ) logger.info("KvPushRouter bindings test completed successfully") @@ -799,7 +806,7 @@ def test_indexers_sync(request, runtime_services, predownload_tokenizers): # runtime_services starts etcd and nats logger.info("Starting indexers sync test") - # Create mocker args dictionary + # Create mocker args dicti: FixtureRequestonary: tuple[NatsServer, EtcdServer]: NoneType mocker_args = {"speedup_ratio": SPEEDUP_RATIO, "block_size": BLOCK_SIZE} try: @@ -1028,7 +1035,7 @@ def test_query_instance_id_returns_worker_and_tokens( 1. NOT route the request to a worker immediately 2. Return worker_instance_id as an SSE event 3. Return token_data as an SSE event containing the request tokens - 4. Terminate the stream with [DONE] + 4. Term: FixtureRequestinate the stream w: tuple[NatsServer, EtcdServer]ith [DONE]: NoneType This tests the specific code block: if query_instance_id {