Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions tests/checkpoint_engine/test_special_server_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def init_config() -> DictConfig:
config = compose(
config_name="ppo_trainer",
overrides=[
"+async_training.partial_rollout_resume=True",
"+async_training.partial_rollout=True",
],
)

Expand Down Expand Up @@ -73,8 +73,8 @@ async def _run_update_weights_with_global_steps_none(
assert output.stop_reason not in ("aborted", "abort"), (
f"output.stop_reason is {output.stop_reason}, expected not abort"
)
assert output.extra_info["global_steps"] is None, (
f"output.extra_info['global_steps'] is {output.extra_info['global_steps']}, expected None"
assert output.extra_fields["global_steps"] is None, (
f"output.extra_fields['global_steps'] is {output.extra_fields['global_steps']}, expected None"
)
print("========== [update_weights with global_steps=None] ==========")
print("[RESPONSE]", tokenizer.decode(output.token_ids, skip_special_tokens=True))
Expand Down Expand Up @@ -112,7 +112,7 @@ async def _run_server_manager_without_resume(
outputs = await asyncio.gather(*tasks)
expected_steps = global_steps - 1
for output in outputs:
global_steps = output.extra_info["global_steps"]
global_steps = output.extra_fields["global_steps"]
assert output.stop_reason in ("aborted", "abort"), (
f"output.stop_reason is {output.stop_reason}, expected in abort"
)
Expand Down Expand Up @@ -156,8 +156,8 @@ async def _run_server_manager_with_resume(
outputs = await asyncio.gather(*tasks)
expected_min_steps = initial_steps - 1
for output in outputs:
min_global_steps = output.extra_info["min_global_steps"]
max_global_steps = output.extra_info["max_global_steps"]
min_global_steps = output.extra_fields["min_global_steps"]
max_global_steps = output.extra_fields["max_global_steps"]
assert min_global_steps == expected_min_steps, (
f"output.min_global_steps is {min_global_steps}, expected {expected_min_steps}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Optional

import numpy as np
Expand All @@ -29,17 +28,8 @@
_InternalAgentLoopOutput,
)
from verl.experimental.agent_loop.single_turn_agent_loop import SingleTurnAgentLoop
from verl.experimental.fully_async_policy.agent_loop.partial_single_turn_agent_loop import PartialSingleTurnAgentLoop
from verl.protocol import DataProto
from verl.utils.dataset.rl_dataset import RLHFDataset


@dataclass
class _FakeTokenOutput:
token_ids: list[int]
log_probs: Optional[list[float]] = None
routed_experts: Any = None
num_preempted: Optional[int] = None
from verl.workers.rollout.replica import TokenOutput


class _FakeServerManager:
Expand All @@ -51,10 +41,10 @@ async def generate(
sampling_params: dict[str, Any],
image_data: Optional[list[Any]] = None,
video_data: Optional[list[Any]] = None,
) -> _FakeTokenOutput:
) -> TokenOutput:
del request_id, sampling_params, image_data, video_data
# Return a short, deterministic "generation" for testing.
return _FakeTokenOutput(token_ids=prompt_ids[-1:] + [11, 12, 13], log_probs=[0.0, 0.0, 0.0, 0.0])
return TokenOutput(token_ids=prompt_ids[-1:] + [11, 12, 13], log_probs=[0.0, 0.0, 0.0, 0.0])

async def generate_for_partial(
self,
Expand Down Expand Up @@ -173,88 +163,52 @@ async def test_agent_loop_extra_fields_schema_stable_for_training_concat_on_cpu(
dataset_cls=RLHFDataset,
data_config=data_config,
)
partial_single_turn = PartialSingleTurnAgentLoop(
trainer_config=trainer_config,
server_manager=server_manager,
tokenizer=tokenizer,
processor=processor,
dataset_cls=RLHFDataset,
data_config=data_config,
)

raw_prompt = [{"role": "user", "content": "hi"}]
sampling_params: dict[str, Any] = {}

out_a = await single_turn.run(sampling_params=sampling_params, raw_prompt=raw_prompt)
out_b = await partial_single_turn.run(sampling_params=sampling_params, raw_prompt=raw_prompt, param_version=0)
out = await single_turn.run(sampling_params=sampling_params, raw_prompt=raw_prompt)

# Agent loop outputs should always contain these fields with consistent types.
assert out_a.extra_fields["turn_scores"] == []
assert out_a.extra_fields["tool_rewards"] == []
assert out_b.extra_fields["turn_scores"] == []
assert out_b.extra_fields["tool_rewards"] == []

prompt_len = max(len(out_a.prompt_ids), len(out_b.prompt_ids))
response_len = max(len(out_a.response_ids), len(out_b.response_ids))
assert out.extra_fields["turn_scores"] == []
assert out.extra_fields["tool_rewards"] == []

internal_a = _to_internal(
output_prompt_ids=out_a.prompt_ids,
output_response_ids=out_a.response_ids,
output_response_mask=out_a.response_mask,
metrics=out_a.metrics,
extra_fields=out_a.extra_fields,
num_turns=out_a.num_turns,
prompt_len=prompt_len,
response_len=response_len,
)
internal_b = _to_internal(
output_prompt_ids=out_b.prompt_ids,
output_response_ids=out_b.response_ids,
output_response_mask=out_b.response_mask,
metrics=out_b.metrics,
extra_fields=out_b.extra_fields,
num_turns=out_b.num_turns,
prompt_len=prompt_len,
response_len=response_len,
output_prompt_ids=out.prompt_ids,
output_response_ids=out.response_ids,
output_response_mask=out.response_mask,
metrics=out.metrics,
extra_fields=out.extra_fields,
num_turns=out.num_turns,
prompt_len=len(out.prompt_ids),
response_len=len(out.response_ids),
)

# Mimic two "worker chunks" and concatenate as in training.
dummy_worker = type("_DummyWorker", (), {"reward_loop_worker_handles": None})()
chunk_a = AgentLoopWorker._postprocess(
merged = AgentLoopWorker._postprocess(
dummy_worker,
inputs=[internal_a],
input_non_tensor_batch={
"index": np.array([0], dtype=object),
"agent_name": np.array(["single_turn_agent"], dtype=object),
},
)
chunk_b = AgentLoopWorker._postprocess(
dummy_worker,
inputs=[internal_b],
input_non_tensor_batch={
"index": np.array([1], dtype=object),
"agent_name": np.array(["partial_single_turn_agent"], dtype=object),
},
)
merged: DataProto = DataProto.concat([chunk_a, chunk_b])

# Stable schema: present regardless of which loop produced a sample.
stable_keys = (
"turn_scores",
"tool_rewards",
"is_cancel",
"param_version_start",
"param_version_end",
"min_global_steps",
"max_global_steps",
"extras",
)
for key in stable_keys:
assert key in merged.non_tensor_batch, f"missing key in merged batch: {key}"
assert merged.non_tensor_batch[key].shape == (2,), (
assert merged.non_tensor_batch[key].shape == (1,), (
f"invalid shape for {key}: {merged.non_tensor_batch[key].shape}"
)

# And the list-typed fields are actually lists (not missing / scalar).
assert merged.non_tensor_batch["turn_scores"][0] == []
assert merged.non_tensor_batch["tool_rewards"][0] == []
assert merged.non_tensor_batch["turn_scores"][1] == []
assert merged.non_tensor_batch["tool_rewards"][1] == []
134 changes: 0 additions & 134 deletions tests/experimental/agent_loop/test_multi_modal.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,137 +434,3 @@ def test_multimodal_single_turn_agent(init_config):

print("Single turn multimodal test passed!")
ray.shutdown()


def test_multimodal_partial_single_turn_agent(init_config):
"""Test partial single turn agent loop with multimodal inputs using Qwen VL model."""

# TODO(baiyan):
# see verl/recipe/fully_async_policy/agent_loop/partial_single_turn_agent_loop.py for more details.
# if use_correct_processor=True, the test will pass but the async training will hang, so I disable this test
# for now

return

ray.init(
runtime_env={
"env_vars": {
"TOKENIZERS_PARALLELISM": "true",
"NCCL_DEBUG": "WARN",
"VLLM_LOGGING_LEVEL": "INFO",
"VLLM_USE_V1": "1",
}
},
ignore_reinit_error=True,
)
from verl.experimental.fully_async_policy.agent_loop import FullyAsyncAgentLoopManager

# =========================== 1. Init rollout manager ===========================
n = 2
init_config.actor_rollout_ref.rollout.n = n
init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 1
init_config.actor_rollout_ref.rollout.multi_turn.max_user_turns = 1
import asyncio

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
agent_loop_manager = loop.run_until_complete(FullyAsyncAgentLoopManager.create(init_config))

# =========================== 2. Generate sequences with multimodal prompts ===========================
# Create a simple test image
test_image = Image.new("RGB", (256, 256), (200, 100, 50))
test_image2 = Image.new("RGB", (512, 512), (100, 150, 200))

raw_prompts = [
[
{"role": "user", "content": "What is the capital of France?"},
],
[
{
"role": "user",
"content": [
{"type": "image", "image": test_image},
{"type": "text", "text": "What do you see in this image?"},
],
},
],
[
{
"role": "system",
"content": "You are Qwen VL, a helpful multimodal assistant.",
},
{
"role": "user",
"content": [
{"type": "image", "image": test_image2},
{"type": "text", "text": "Analyze the colors in this image."},
],
},
],
]

batch = DataProto(
non_tensor_batch={
"raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object),
"agent_name": np.array(["partial_single_turn_agent"] * len(raw_prompts)),
"data_source": np.array(["openai/gsm8k"] * len(raw_prompts)),
"reward_model": np.array([{"style": "rule", "ground_truth": "1.0"}] * len(raw_prompts)),
},
)

batch = batch.repeat(n)
result = agent_loop_manager.generate_sequences(prompts=batch)
assert len(result) == len(raw_prompts) * n

# Check turns - all should be single turn (2: user + assistant)
num_turns = result.non_tensor_batch["__num_turns__"]
print(f"num_turns: {num_turns}")
for i in range(len(num_turns)):
assert num_turns[i] == 2, f"Expected 2 turns but got {num_turns[i]} for sample {i}"

# Verify responses
tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path)
prompts = result.batch["prompts"]
responses = result.batch["responses"]
response_mask = result.batch["response_mask"]
assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}"

# Check for image pads in prompts
image_pad_count = 0
for i in range(len(prompts)):
prompt_ids = prompts[i][prompts[i] != tokenizer.pad_token_id].tolist()
prompt_text = tokenizer.decode(prompt_ids)

# Check if this sample should have image pads (samples with index 1 and 2 in each repeat have images)
sample_idx = i // n
has_image_pad = "<|image_pad|>" in prompt_text or "<|vision_start|>" in prompt_text

print("=========================")
print(f"Sample {i} (original prompt index: {sample_idx}):")
print(f"Prompt length: {len(prompt_ids)} tokens")
print(f"Has image_pad: {has_image_pad}")

if sample_idx != 0: # Samples 1 and 2 should have images
if has_image_pad:
image_pad_count += 1
# Count the number of image_pad tokens
num_image_pads = prompt_text.count("<|image_pad|>")
print(f"Number of <|image_pad|> tokens: {num_image_pads}")
else:
print("WARNING: Expected image_pad but not found!")

# Show first 200 chars of prompt
print(f"Prompt text (first 200 chars): {prompt_text[:200]}...")

for i in range(len(responses)):
valid_tokens = responses[i][response_mask[i].bool()]
response_text = tokenizer.decode(valid_tokens)
print(f"Sample {i} response: {response_text[:100]}...")

# Verify that we found image pads in multimodal samples
expected_multimodal_samples = 2 * n # 2 prompts with images, repeated n times
print(f"\nFound {image_pad_count} samples with image_pad out of {expected_multimodal_samples} expected")
assert image_pad_count > 0, "No image_pad tokens found in multimodal samples!"

print("Partial single turn multimodal test passed!")
ray.shutdown()
9 changes: 5 additions & 4 deletions tests/special_e2e/run_fully_async_policy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ n_resp_per_prompt=16
train_prompt_mini_bsz=16
total_rollout_steps=$(((128)))
test_freq=-1
staleness_threshold=0.1
staleness_threshold=0.5
trigger_parameter_sync_step=4
partial_rollout=True
use_trainer_do_validate=False

exp_name="$(basename "${MODEL_ID,,}")-fully-async-policy-${ACTOR_STRATEGY}-minimal"

Expand Down Expand Up @@ -127,13 +128,13 @@ common_params=(
rollout.nnodes=1
rollout.n_gpus_per_node=${n_gpus_rollout}
rollout.total_rollout_steps=${total_rollout_steps}
rollout.total_epochs=2
rollout.test_freq=${test_freq}
trainer.total_epochs=2
trainer.test_freq=${test_freq}
# Fully async specific configurations
async_training.staleness_threshold=${staleness_threshold}
async_training.partial_rollout="${partial_rollout}"
async_training.trigger_parameter_sync_step="${trigger_parameter_sync_step}"
# GPU specific configurations
async_training.use_trainer_do_validate=${use_trainer_do_validate}
actor_rollout_ref.rollout.checkpoint_engine.backend='nccl'
actor_rollout_ref.rollout.checkpoint_engine.update_weights_bucket_megabytes=1024
)
Expand Down
4 changes: 2 additions & 2 deletions tests/special_npu/run_fully_async_policy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ common_params=(
rollout.nnodes=1
rollout.n_gpus_per_node=${n_gpus_rollout}
rollout.total_rollout_steps=${total_rollout_steps}
rollout.total_epochs=2
rollout.test_freq=${test_freq}
trainer.total_epochs=2
trainer.test_freq=${test_freq}
# Fully async specific configurations
async_training.staleness_threshold=${staleness_threshold}
async_training.partial_rollout="${partial_rollout}"
Expand Down
1 change: 0 additions & 1 deletion tests/special_sanity/check_device_api_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
NCCL_KEYWORD_CHECK_WHITELIST = [
"verl/utils/device.py",
"verl/third_party/sglang/parallel_state.py", # appear in default backend
"verl/recipe/fully_async_policy/param_sync.py", # fully_async_policy in default backend
]

SEARCH_WHITELIST = CUDA_KEYWORD_CHECK_WHITELIST + NCCL_KEYWORD_CHECK_WHITELIST
Expand Down
1 change: 1 addition & 0 deletions tests/special_sanity/check_pr_title.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
allowed_modules += ["tests", "training_utils", "recipe", "hardware", "deployment"]
allowed_modules += ["ray", "worker", "single_controller", "misc", "docker", "ci"]
allowed_modules += ["perf", "model", "algo", "env", "tool", "ckpt", "doc", "data", "cfg", "reward"]
allowed_modules += ["fully_async", "one_step_off"]
allowed_types = ["feat", "fix", "refactor", "chore", "test"]

# Check for [1/N] prefix and extract the rest of the title
Expand Down
7 changes: 3 additions & 4 deletions verl/experimental/agent_loop/agent_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async def generate(
"""
server_id, server = await self._acquire_server(request_id)
try:
output = await server.generate.remote(
output: TokenOutput = await server.generate.remote(
request_id=uuid4().hex, # use new request_id for each turn
prompt_ids=prompt_ids,
sampling_params=sampling_params,
Expand Down Expand Up @@ -839,9 +839,8 @@ def _postprocess(
default_extra_keys = {
"turn_scores",
"tool_rewards",
"is_cancel",
"param_version_start",
"param_version_end",
"min_global_steps",
"max_global_steps",
"extras",
}
all_keys = set(key for input_item in inputs for key in input_item.extra_fields) | default_extra_keys
Expand Down
Loading
Loading