Skip to content
Merged
Changes from 22 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
137a895
Update README.md
zhixiongli2011 Sep 19, 2025
5ccb21c
Update README.md
zhixiongli2011 Sep 19, 2025
6291f02
Merge branch 'ai-dynamo:main' into main
zhixiongli2011 Sep 25, 2025
495ec19
Merge branch 'ai-dynamo:main' into main
zhixiongli2011 Sep 26, 2025
732f574
Update test_router_e2e_with_mockers.py with a new helper function
zhixiongli2011 Sep 27, 2025
a56f295
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
460b1c4
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
51786dc
Update tests/router/test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
163fe70
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
b9fe4b8
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
28ff962
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
45ca761
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
60bfa53
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
e90f558
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
29b184f
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
8e19e41
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
93502ed
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
a9ae437
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 27, 2025
d4f11ae
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 29, 2025
30b0de4
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 29, 2025
288c263
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 29, 2025
f703fd3
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 29, 2025
3f4d77b
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 30, 2025
e4682ef
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 30, 2025
abfda5a
Update test_router_e2e_with_mockers.py
zhixiongli2011 Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
351 changes: 174 additions & 177 deletions tests/router/test_router_e2e_with_mockers.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,92 @@ async def send_single_request(session: aiohttp.ClientSession, request_id: int):
logger.info(f"All {num_requests} requests completed successfully")


async def send_request_via_python_kv_router(
kv_python_router: KvPushRouter,
token_ids: list,
initial_wait: float,
max_retries: int,
stop_conditions: Optional[dict] = None,
sampling_options: Optional[dict] = None,
output_options: Optional[dict] = None,
router_config_override: Optional[dict] = None,
worker_id: Optional[
int
] = None, # If None, Router will select the best available worker
):
"""Send a request to the specified mocker instance.
Returns True if mockers respond, otherwise raises or returns False.
"""

wait_time = initial_wait

log_message = (
f"the mocker with worker_id={worker_id}"
if worker_id is not None
else "the best available mocker"
)

# Retry loop sending reuqest to mocker worker with exponential backoff
for attempt in range(max_retries + 1):
try:
logger.info(f"Sending request to {log_message} (attempt {attempt + 1})")

stream = await kv_python_router.generate(
token_ids=token_ids,
model=MODEL_NAME,
stop_conditions=stop_conditions,
sampling_options=sampling_options,
output_options=output_options,
router_config_override=router_config_override,
# worker_id=worker_id
)

if stream is not None:
logger.info(f"Request succeeded on attempt {attempt + 1}")
break

except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2
else:
raise RuntimeError(
f"Failed to connect to mockers after {max_retries + 1} attempts: {e}"
)

# Collect tokens from the SSE stream
generated_tokens = []
async for response in stream:
if isinstance(response, dict):
# Check if response has token_ids
if "token_ids" in response:
tokens = response["token_ids"]
if isinstance(tokens, list):
generated_tokens.extend(tokens)
logger.debug(f"Received {len(tokens)} tokens: {tokens}")

# Check for finish reason
if "finish_reason" in response:
logger.info(f"Stream finished with reason: {response['finish_reason']}")

# Verify if expected number of tokens are generated
logger.info(f"Total generated tokens: {len(generated_tokens)}")
if stop_conditions and "max_tokens" in stop_conditions:
max_tokens = int(stop_conditions["max_tokens"])
assert len(generated_tokens) == max_tokens, (
f"Expected exactly {max_tokens} tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)

logger.info(
f"Successfully verified {max_tokens} tokens generated via KvPushRouter"
)
return True

return False


@pytest.mark.pre_merge
@pytest.mark.model(MODEL_NAME)
def test_mocker_kv_router(request, runtime_services, predownload_tokenizers):
Expand Down Expand Up @@ -584,201 +670,112 @@ def test_kv_push_router_bindings(request, runtime_services, predownload_tokenize
logger.info(f"All mockers using endpoint: {mockers.endpoint}")
mockers.__enter__()

# Wait for mockers to be ready by sending a dummy request with retry
async def wait_for_mockers_ready():
"""Send a dummy request to ensure mockers are ready"""
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
endpoint = component.endpoint("generate")

kv_router_config = KvRouterConfig()
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)

# Dummy request with minimal tokens
dummy_token_ids = [1, 2, 3] # Just a few tokens for testing
max_retries = 8
wait_time = 1

for attempt in range(max_retries + 1):
try:
logger.info(
f"Sending dummy request to check mocker readiness (attempt {attempt + 1})"
)
stream = await kv_push_router.generate(
token_ids=dummy_token_ids,
model=MODEL_NAME,
stop_conditions={"max_tokens": 1}, # Generate just 1 token
sampling_options={"temperature": 0.7},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
)

# Consume the stream to verify it works
token_count = 0
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
token_count += len(response["token_ids"])

logger.info(
f"Mockers are ready! Dummy request succeeded on attempt {attempt + 1}"
)
return True

except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed with error: {e}")
if attempt < max_retries:
await asyncio.sleep(wait_time)
wait_time *= 2 # Exponential backoff
else:
raise RuntimeError(
f"Failed to connect to mockers after {max_retries + 1} attempts"
)

return False

# Wait for mockers to be ready
asyncio.run(wait_for_mockers_ready())

# Run the async test
async def test_kv_push_router():
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
endpoint = component.endpoint("generate")
# Get runtime and create endpoint
runtime = get_runtime()
# Use the namespace from the mockers
namespace = runtime.namespace(mockers.namespace)
component = namespace.component("mocker")
endpoint = component.endpoint("generate")

# Create KvRouterConfig with default settings
kv_router_config = KvRouterConfig()

# Create KvPushRouter Python object
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
)

# Create KvRouterConfig with default settings
kv_router_config = KvRouterConfig()
logger.info("Created KvPushRouter Python object")

# Create KvPushRouter Python object
kv_push_router = KvPushRouter(
endpoint=endpoint,
block_size=BLOCK_SIZE,
kv_router_config=kv_router_config,
# Initialize and check the readiness of the mockers by sending dummy request
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=[1, 2, 3],
initial_wait=1.0,
max_retries=8,
stop_conditions={"max_tokens": 1}, # Generate just 1 token
sampling_options={"temperature": 0.7},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
)
)

logger.info("Created KvPushRouter Python object")

# Generate random token IDs (100 to 200 tokens)
num_input_tokens = random.randint(100, 200)
token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]

logger.info(f"Generated {num_input_tokens} random token IDs")

# Set up generation parameters
stop_conditions = {
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
}

sampling_options = {"temperature": 0.7, "top_p": 0.9}
# Generate random token IDs (100 to 200 tokens)
num_input_tokens = random.randint(100, 200)
token_ids = [random.randint(1, 10000) for _ in range(num_input_tokens)]

output_options = {"include_input_tokens": False, "return_full_text": False}
# Set up override parameters
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}

# Test with router config overrides
router_config_override = {
"overlap_score_weight": 0.5, # Override the default weight
"router_temperature": 0.5, # Override the default temperature
}
logger.info(f"Generated {num_input_tokens} random token IDs")

# Call generate method
logger.info(
"Calling generate method on KvPushRouter with router config overrides"
)
logger.info(f"Router config overrides: {router_config_override}")
stream = await kv_push_router.generate(
# Test with full overrides
logger.info(
f"Testing with full router config overrides: {router_config_override}"
)
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids,
model=MODEL_NAME,
stop_conditions=stop_conditions,
sampling_options=sampling_options,
output_options=output_options,
initial_wait=1.0,
max_retries=8,
stop_conditions={
"ignore_eos": True, # Don't stop on EOS token
"max_tokens": 20, # Generate exactly 20 tokens
},
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=router_config_override,
)
)

# Collect tokens from the SSE stream
generated_tokens = []
async for response in stream:
if isinstance(response, dict):
# Check if response has token_ids
if "token_ids" in response:
tokens = response["token_ids"]
if isinstance(tokens, list):
generated_tokens.extend(tokens)
logger.debug(f"Received {len(tokens)} tokens: {tokens}")

# Check for finish reason
if "finish_reason" in response:
logger.info(
f"Stream finished with reason: {response['finish_reason']}"
)

# Verify we got exactly 20 tokens
logger.info(f"Total generated tokens: {len(generated_tokens)}")
assert len(generated_tokens) == 20, (
f"Expected exactly 20 tokens but got {len(generated_tokens)}. "
f"Tokens: {generated_tokens}"
)

logger.info(
"Successfully verified 20 tokens generated via KvPushRouter with overrides"
)

# Test again without overrides
logger.info("Testing again without router config overrides")
stream = await kv_push_router.generate(
token_ids=token_ids[:50], # Use fewer tokens for second test
model=MODEL_NAME,
# Test without overrides
logger.info("Testing without router config overrides")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids[:50], # Use fewer tokens for second test,
initial_wait=1.0,
max_retries=8,
stop_conditions={"max_tokens": 10},
sampling_options=sampling_options,
output_options=output_options,
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
# No router_config_override this time
)
)

generated_tokens_no_override = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens_no_override.extend(response["token_ids"])

assert (
len(generated_tokens_no_override) == 10
), f"Expected 10 tokens but got {len(generated_tokens_no_override)}"
logger.info("Successfully verified generation without overrides")

# Test with partial override (only temperature)
logger.info(
"Testing with partial router config override (temperature only)"
)
partial_override = {"router_temperature": 0.1}
stream = await kv_push_router.generate(
token_ids=token_ids[:30], # Use even fewer tokens
model=MODEL_NAME,
# Test with partial override (only temperature)
partial_override = {"router_temperature": 0.1}
logger.info(f"Testing with partial router config overrides: {partial_override}")
asyncio.run(
send_request_via_python_kv_router(
kv_python_router=kv_push_router,
token_ids=token_ids[:30], # Use fewer tokens for third test,
initial_wait=1.0,
max_retries=8,
stop_conditions={"max_tokens": 5},
sampling_options=sampling_options,
output_options=output_options,
sampling_options={"temperature": 0.7, "top_p": 0.9},
output_options={
"include_input_tokens": False,
"return_full_text": False,
},
router_config_override=partial_override,
)

generated_tokens_partial = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens_partial.extend(response["token_ids"])

assert (
len(generated_tokens_partial) == 5
), f"Expected 5 tokens but got {len(generated_tokens_partial)}"
logger.info("Successfully verified generation with partial override")

# Run the async test
asyncio.run(test_kv_push_router())
)

logger.info("KvPushRouter bindings test completed successfully")

Expand Down
Loading