diff --git a/tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py b/tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py index 19111b2c75..cdf31bb00e 100644 --- a/tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py +++ b/tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py @@ -15,14 +15,12 @@ """ Unit tests for Qwen2.5 Omni Model implementation. -Run with: torchrun --nproc_per_node=8 -m pytest tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py -Or for single GPU: pytest tests/unit_tests/models/qwen_omni/modeling_qwen25_omni/test_omni_model.py +Uses toy-sized configs to keep tests fast (<5s each). """ import datetime import os -import numpy as np import pytest import torch import torch.distributed as dist @@ -31,43 +29,69 @@ from megatron.core.models.gpt.gpt_layer_specs import get_gpt_layer_with_transformer_engine_spec from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.tensor_parallel.random import model_parallel_cuda_manual_seed -from transformers import AutoConfig, AutoProcessor +from transformers.models.qwen2_5_omni.configuration_qwen2_5_omni import ( + Qwen2_5OmniAudioEncoderConfig, + Qwen2_5OmniThinkerConfig, + Qwen2_5OmniVisionEncoderConfig, +) from megatron.bridge.models.qwen_omni.modeling_qwen25_omni.model import Qwen25OmniModel from megatron.bridge.models.qwen_omni.modeling_qwen25_omni.transformer_config import Qwen25OmniTransformerConfig -@pytest.fixture(scope="module") -def processor(): - """Load HuggingFace processor once for all tests.""" - return AutoProcessor.from_pretrained("Qwen/Qwen2.5-Omni-7B") - - -@pytest.fixture(scope="module") -def hf_config(): - """Load HuggingFace config once for all tests.""" - return AutoConfig.from_pretrained("Qwen/Qwen2.5-Omni-7B") - - -@pytest.fixture -def random_image(): - """Generate a random PIL image.""" - return np.random.randint(0, 255, size=(24, 24, 3), dtype=np.uint8) +HIDDEN_SIZE = 128 -@pytest.fixture -def random_video(): - """Generate a random video.""" - return np.random.randint(0, 255, size=(2, 3, 24, 44), dtype=np.uint8) +def _make_toy_thinker_config(): + """Build a tiny Qwen2.5-Omni thinker config for fast unit tests.""" + vision_config = Qwen2_5OmniVisionEncoderConfig( + depth=2, + hidden_size=64, + num_heads=4, + intermediate_size=128, + in_channels=3, + patch_size=14, + spatial_merge_size=2, + temporal_patch_size=2, + window_size=112, + out_hidden_size=HIDDEN_SIZE, + fullatt_block_indexes=[1], + ) + audio_config = Qwen2_5OmniAudioEncoderConfig( + encoder_layers=2, + encoder_attention_heads=4, + encoder_ffn_dim=128, + d_model=64, + num_mel_bins=32, + output_dim=HIDDEN_SIZE, + max_source_positions=100, + n_window=10, + ) + return Qwen2_5OmniThinkerConfig( + vision_config=vision_config, + audio_config=audio_config, + text_config={ + "hidden_size": HIDDEN_SIZE, + "intermediate_size": 256, + "num_hidden_layers": 2, + "num_attention_heads": 4, + "num_key_value_heads": 2, + "vocab_size": 1000, + "max_position_embeddings": 256, + "rms_norm_eps": 1e-6, + "attention_dropout": 0.0, + "rope_scaling": {"mrope_section": [4, 6, 6], "type": "mrope"}, + }, + ) -@pytest.fixture -def random_audio(): - """Generate a random audio.""" - return np.random.randint(-1, 32767, size=(800), dtype=np.int16) +@pytest.fixture(scope="module") +def thinker_config(): + """Toy thinker config shared across all tests in this module.""" + return _make_toy_thinker_config() -@pytest.mark.pleasefixme # TODO: replace full-size HF vision/audio encoders with dummy stubs to avoid timeout +@pytest.mark.timeout(30) class TestQwen25OmniModel: """Test suite for Qwen2.5 Omni Model.""" @@ -99,14 +123,7 @@ def teardown_class(cls): dist.destroy_process_group() def _setup_parallel_state(self, tp_size=1, pp_size=1, cp_size=1): - """Setup Megatron parallel state with specified parallelism configuration. - - Args: - tp_size: Tensor model parallel size - pp_size: Pipeline model parallel size - cp_size: Context parallel size - """ - # Clean up any existing parallel state before initializing + """Setup Megatron parallel state.""" if parallel_state.model_parallel_is_initialized(): parallel_state.destroy_model_parallel() @@ -124,188 +141,75 @@ def teardown_method(self): parallel_state.destroy_model_parallel() @staticmethod - def get_thinker_transformer_config(hf_config): - """Create a thinker transformer config for testing. - - Returns: - Qwen2_5OmniThinkerConfig: HF configuration for the thinker model. - """ - return hf_config.thinker_config + def _make_language_config(thinker_config): + """Create a Qwen25OmniTransformerConfig from the toy thinker config.""" + text_cfg = thinker_config.text_config + vision_cfg = thinker_config.vision_config - @staticmethod - def get_language_transformer_config(hf_config): - """Create a language transformer config for testing. - - Uses actual Qwen2.5-Omni-7B model sizes to ensure compatibility - with the vision/audio model outputs. - - Args: - hf_config: HuggingFace config object. - - Returns: - Qwen25OmniTransformerConfig: Configuration for the language model. - """ - thinker_config = hf_config.thinker_config - rope_scaling = getattr(thinker_config.text_config, "rope_scaling", None) - if rope_scaling is not None: - mrope_section = rope_scaling.get("mrope_section", [16, 24, 24]) - else: - mrope_section = [16, 24, 24] + rope_scaling = getattr(text_cfg, "rope_scaling", None) + mrope_section = (rope_scaling or {}).get("mrope_section", [4, 6, 6]) return Qwen25OmniTransformerConfig( - # Use actual model dimensions from HF config - num_layers=4, # Reduced for testing (actual: thinker_config.text_config.num_hidden_layers) - hidden_size=thinker_config.text_config.hidden_size, - num_attention_heads=thinker_config.text_config.num_attention_heads, - num_query_groups=thinker_config.text_config.num_key_value_heads, - kv_channels=thinker_config.text_config.hidden_size // thinker_config.text_config.num_attention_heads, - ffn_hidden_size=thinker_config.text_config.intermediate_size, - # Qwen2.5-Omni specific - vocab_size=thinker_config.text_config.vocab_size, - language_max_sequence_length=thinker_config.text_config.max_position_embeddings, - # Vision parameters - patch_size=thinker_config.vision_config.patch_size, - temporal_patch_size=thinker_config.vision_config.temporal_patch_size, - in_channels=thinker_config.vision_config.in_channels, - spatial_merge_size=thinker_config.vision_config.spatial_merge_size, - # RoPE settings - rotary_base=getattr(thinker_config.text_config, "rope_theta", 1000000), + num_layers=2, + hidden_size=text_cfg.hidden_size, + num_attention_heads=text_cfg.num_attention_heads, + num_query_groups=text_cfg.num_key_value_heads, + kv_channels=text_cfg.hidden_size // text_cfg.num_attention_heads, + ffn_hidden_size=text_cfg.intermediate_size, + vocab_size=text_cfg.vocab_size, + language_max_sequence_length=text_cfg.max_position_embeddings, + patch_size=vision_cfg.patch_size, + temporal_patch_size=vision_cfg.temporal_patch_size, + in_channels=vision_cfg.in_channels, + spatial_merge_size=vision_cfg.spatial_merge_size, + rotary_base=getattr(text_cfg, "rope_theta", 1000000), rotary_percent=1.0, mrope_section=mrope_section, - # Training settings normalization="RMSNorm", activation_func=F.silu, gated_linear_unit=True, add_bias_linear=False, add_qkv_bias=True, qk_layernorm=False, - layernorm_epsilon=thinker_config.text_config.rms_norm_eps, + layernorm_epsilon=text_cfg.rms_norm_eps, bf16=False, use_cpu_initialization=True, hidden_dropout=0.0, - attention_dropout=thinker_config.text_config.attention_dropout, + attention_dropout=text_cfg.attention_dropout, ) @staticmethod - def get_language_model_layer_spec(): - """Create a GPT layer spec for the language model (dense, no MoE). - - Returns: - ModuleSpec: Layer specification for transformer layers. - """ - language_model_layer_spec = get_gpt_layer_with_transformer_engine_spec( + def _make_layer_spec(): + """Create a GPT layer spec for the language model.""" + return get_gpt_layer_with_transformer_engine_spec( num_experts=None, moe_grouped_gemm=False, qk_layernorm=False, fp8=False, ) - return language_model_layer_spec - @staticmethod - def get_data_batch(processor, random_image, random_video, random_audio): - """Generate a batch of data for model forward pass. - - Args: - processor: HuggingFace processor. - random_image: Random PIL image. - random_video: Random video. - random_audio: Random audio. - Returns: - dict: A dictionary containing all inputs needed for model forward pass: - - input_ids: Token IDs [batch, seq_len] - - attention_mask: Attention mask [batch, seq_len] - - pixel_values: Image pixel values [batch, channels, height, width] - - image_grid_thw: Image grid dimensions [num_images, 3] (temporal, height, width) - - pixel_values_videos: Video pixel values (None for images only) - - video_grid_thw: Video grid dimensions (None for images only) - - input_features: Audio values (None if no audio) - - feature_attention_mask: Audio attention mask (None if no audio) - - video_second_per_grid: Video seconds per grid - """ - # Create a sample message with image and text - messages = [ - { - "role": "user", - "content": [ - { - "type": "image", - "image": random_image, - }, - { - "type": "video", - "video": random_video, - }, - { - "type": "audio", - "audio": random_audio, - }, - {"type": "text", "text": "Describe this image, video and audio."}, - ], - } - ] - - # Process inputs using HuggingFace processor - inputs = processor.apply_chat_template( - messages, - tokenize=True, - add_generation_prompt=True, - return_dict=True, - return_tensors="pt", - ) - - batch = { - "input_ids": inputs["input_ids"], - "attention_mask": inputs.get("attention_mask"), - "pixel_values": inputs.get("pixel_values"), - "image_grid_thw": inputs.get("image_grid_thw"), - "pixel_values_videos": inputs.get("pixel_values_videos"), - "video_grid_thw": inputs.get("video_grid_thw"), - "input_features": inputs.get("input_features"), - "feature_attention_mask": inputs.get("feature_attention_mask"), - "video_second_per_grid": inputs.get("video_second_per_grid"), - "position_ids": None, - "labels": None, - } - - # Move tensors to CUDA if available - if torch.cuda.is_available(): - for key, value in batch.items(): - if value is not None and isinstance(value, torch.Tensor): - batch[key] = value.cuda() - - return batch - - @pytest.mark.timeout(50) - @pytest.mark.parametrize( - "freeze_all", - [True, False], - ) - def test_model_freeze_api(self, freeze_all, hf_config): - """Test model freeze API.""" + def _build_model(self, thinker_config, pre_process=True, post_process=True, add_encoder=True, add_decoder=True): + """Helper to build a Qwen25OmniModel with the given flags.""" self._setup_parallel_state(tp_size=1, pp_size=1) pg_collection = ProcessGroupCollection.use_mpu_process_groups() - assert pg_collection is not None - assert pg_collection.tp is not None - assert pg_collection.pp is not None - assert pg_collection.cp is not None - assert pg_collection.embd is not None - - language_transformer_config = self.get_language_transformer_config(hf_config) - language_model_layer_spec = self.get_language_model_layer_spec() - thinker_transformer_config = self.get_thinker_transformer_config(hf_config) - - model = Qwen25OmniModel( - language_transformer_config=language_transformer_config, - language_transformer_layer_spec=language_model_layer_spec, - thinker_transformer_config=thinker_transformer_config, + + return Qwen25OmniModel( + language_transformer_config=self._make_language_config(thinker_config), + language_transformer_layer_spec=self._make_layer_spec(), + thinker_transformer_config=thinker_config, parallel_output=True, - pre_process=True, - post_process=True, - add_encoder=True, - add_decoder=True, + pre_process=pre_process, + post_process=post_process, + add_encoder=add_encoder, + add_decoder=add_decoder, pg_collection=pg_collection, ) + @pytest.mark.parametrize("freeze_all", [True, False]) + def test_model_freeze_api(self, freeze_all, thinker_config): + """Test model freeze API.""" + model = self._build_model(thinker_config) + if torch.cuda.is_available(): model.to("cuda") @@ -318,99 +222,23 @@ def test_model_freeze_api(self, freeze_all, hf_config): for name, param in model.named_parameters(): assert param.requires_grad != freeze_all, f"{name=}" - @pytest.mark.timeout(50) - def test_shared_embedding_or_output_weight(self, hf_config): + def test_shared_embedding_or_output_weight(self, thinker_config): """Test shared_embedding_or_output_weight method.""" - self._setup_parallel_state(tp_size=1, pp_size=1) - pg_collection = ProcessGroupCollection.use_mpu_process_groups() - assert pg_collection is not None - assert pg_collection.tp is not None - assert pg_collection.pp is not None - assert pg_collection.cp is not None - assert pg_collection.embd is not None - - language_transformer_config = self.get_language_transformer_config(hf_config) - language_model_layer_spec = self.get_language_model_layer_spec() - thinker_transformer_config = self.get_thinker_transformer_config(hf_config) - - # Test with add_decoder=True - model = Qwen25OmniModel( - language_transformer_config=language_transformer_config, - language_transformer_layer_spec=language_model_layer_spec, - thinker_transformer_config=thinker_transformer_config, - parallel_output=True, - pre_process=True, - post_process=True, - add_encoder=True, - add_decoder=True, - pg_collection=pg_collection, - ) + model = self._build_model(thinker_config, add_decoder=True) weight = model.shared_embedding_or_output_weight() assert weight is not None - # Test with add_decoder=False - model = Qwen25OmniModel( - language_transformer_config=language_transformer_config, - language_transformer_layer_spec=language_model_layer_spec, - thinker_transformer_config=thinker_transformer_config, - parallel_output=True, - pre_process=True, - post_process=True, - add_encoder=True, - add_decoder=False, - pg_collection=pg_collection, - ) + model = self._build_model(thinker_config, add_decoder=False) weight_no_decoder = model.shared_embedding_or_output_weight() assert weight_no_decoder is None - @pytest.mark.timeout(50) - def test_set_input_tensor(self, hf_config): + def test_set_input_tensor(self, thinker_config): """Test set_input_tensor method.""" - self._setup_parallel_state(tp_size=1, pp_size=1) - pg_collection = ProcessGroupCollection.use_mpu_process_groups() - assert pg_collection is not None - assert pg_collection.tp is not None - assert pg_collection.pp is not None - assert pg_collection.cp is not None - assert pg_collection.embd is not None - - language_transformer_config = self.get_language_transformer_config(hf_config) - language_model_layer_spec = self.get_language_model_layer_spec() - thinker_transformer_config = self.get_thinker_transformer_config(hf_config) - hidden_size = language_transformer_config.hidden_size - - # Test with pre_process=True - model = Qwen25OmniModel( - language_transformer_config=language_transformer_config, - language_transformer_layer_spec=language_model_layer_spec, - thinker_transformer_config=thinker_transformer_config, - parallel_output=True, - pre_process=True, - post_process=True, - add_encoder=True, - add_decoder=True, - pg_collection=pg_collection, - ) + model = self._build_model(thinker_config, pre_process=True) + test_tensor = torch.randn(2, 4, HIDDEN_SIZE) - test_tensor = torch.randn(2, 4, hidden_size) - - # Test with single tensor (not a list) model.set_input_tensor([test_tensor]) assert model.thinker.encoder_hidden_state is not None - # Test with pre_process=False - model = Qwen25OmniModel( - language_transformer_config=language_transformer_config, - language_transformer_layer_spec=language_model_layer_spec, - thinker_transformer_config=thinker_transformer_config, - parallel_output=True, - pre_process=False, - post_process=True, - add_encoder=True, - add_decoder=True, - pg_collection=pg_collection, - ) - - # This should set the input tensor on the language model instead + model = self._build_model(thinker_config, pre_process=False) model.set_input_tensor([test_tensor]) - # No assertion here as it sets internal state