Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions components/src/dynamo/common/tests/test_video_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _mock_iio_v2(self):
iio.get_writer = MagicMock(return_value=writer)
return iio, writer

def test_mp4_selects_libx264_codec(self):
def test_mp4_selects_h264_nvenc_codec(self):
from dynamo.common.utils.video_utils import encode_to_video_bytes

iio = self._mock_iio_v3()
Expand All @@ -56,7 +56,7 @@ def test_mp4_selects_libx264_codec(self):

iio.imwrite.assert_called_once()
_, kwargs = iio.imwrite.call_args
assert kwargs.get("codec") == "libx264"
assert kwargs.get("codec") == "h264_nvenc"
assert kwargs.get("fps") == 8

def test_webm_selects_libvpx_vp9_codec(self):
Expand Down Expand Up @@ -154,3 +154,67 @@ def test_v2_api_fallback_writes_all_frames(self):

assert writer.append_data.call_count == 4
writer.close.assert_called_once()


# ---------------------------------------------------------------------------
# normalize_image_frames
# ---------------------------------------------------------------------------


class TestNormalizeImageFrames:
"""Tests for normalize_image_frames() — flattens DiffusionFormatter image
inputs to PIL. Image pipelines usually emit PIL Images; the Cosmos3 native
pipeline emits 5D numpy ``[B, F, H, W, C]``."""

def test_pil_inputs_returned_by_identity(self):
"""PIL inputs must pass through without conversion or copy."""
from PIL import Image

from dynamo.common.utils.video_utils import normalize_image_frames

a = Image.new("RGB", (4, 4), (255, 0, 0))
b = Image.new("RGB", (4, 4), (0, 255, 0))
out = normalize_image_frames([a, b])

assert len(out) == 2
assert out[0] is a and out[1] is b

def test_uint8_hwc_numpy_preserves_pixels(self):
from PIL import Image

from dynamo.common.utils.video_utils import normalize_image_frames

arr = np.full((4, 4, 3), 7, dtype=np.uint8)
out = normalize_image_frames([arr])

assert len(out) == 1
assert isinstance(out[0], Image.Image)
assert out[0].size == (4, 4) # PIL is (W, H)
assert np.asarray(out[0])[0, 0].tolist() == [7, 7, 7]

def test_cosmos3_5d_strips_batch_and_preserves_frame_order(self):
"""[B, F, H, W, C] collapses to F PIL frames in order. Distinct
per-frame content guards against wrong-axis indexing regressions."""
from dynamo.common.utils.video_utils import normalize_image_frames

arr = np.zeros((1, 3, 4, 4, 3), dtype=np.uint8)
arr[0, 0] = 10 # frame 0 fill
arr[0, 1] = 20 # frame 1 fill
arr[0, 2] = 30 # frame 2 fill

out = normalize_image_frames([arr])

assert len(out) == 3
assert np.asarray(out[0])[0, 0, 0] == 10
assert np.asarray(out[1])[0, 0, 0] == 20
assert np.asarray(out[2])[0, 0, 0] == 30

def test_float_zero_to_one_scaled_to_uint8(self):
"""float32 [0, 1] inputs must be rescaled to uint8 [0, 255]."""
from dynamo.common.utils.video_utils import normalize_image_frames

arr = np.full((4, 4, 3), 0.5, dtype=np.float32)
out = normalize_image_frames([arr])

# 0.5 * 255 = 127.5; numpy's banker's rounding yields exactly 128.
assert np.asarray(out[0])[0, 0, 0] == 128
40 changes: 35 additions & 5 deletions components/src/dynamo/common/utils/video_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,34 @@ def normalize_video_frames(images: list) -> list:
return list(frames)


def normalize_image_frames(images: list) -> list:
"""Normalize stage_output.images into a flat list of PIL Images.

Image diffusion pipelines usually return PIL Images, but some (e.g. the
Cosmos3 native pipeline) return numpy arrays shaped ``[batch, frames, H, W,
C]`` even for single images. Collapse leading batch/frame dims and convert
each frame to a PIL Image; PIL inputs pass through unchanged.
"""
from PIL import Image

out: list = []
for item in images:
if isinstance(item, Image.Image):
out.append(item)
continue
arr = np.asarray(item)
while arr.ndim > 4: # [batch, frames, H, W, C] -> [frames, H, W, C]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

normalize_image_frames collapses a [B, F, H, W, C] Cosmos3 array by taking arr[0], so image requests with n > 1 silently drop every generated batch after the first. Fix: preserve and flatten all leading batch/frame dimensions before converting frames to PIL images.

🤖 AI Fix

In components/src/dynamo/common/utils/video_utils.py, update normalize_image_frames to replace the while arr.ndim > 4: arr = arr[0] logic with validation that the last three dimensions are H, W, C and arr = arr.reshape((-1, *arr.shape[-3:])) so all [B, F, H, W, C] outputs are emitted.

arr = arr[0]
if arr.dtype != np.uint8: # frames share a dtype/range; convert once
arr = ((arr.clip(0, 1) * 255).round() if arr.max() <= 1.0 else arr).astype(
np.uint8
)
frames = arr if arr.ndim == 4 else arr[None] # -> [N, H, W, C]
for frame in frames:
out.append(Image.fromarray(frame))
return out


def frames_to_numpy(images: list) -> np.ndarray:
"""Convert a list of PIL Images to a numpy array suitable for video encoding.

Expand Down Expand Up @@ -154,13 +182,15 @@ def encode_to_mp4(
logger.info(f"Encoding {len(frames)} frames to {output_path} at {fps} fps")

try:
# Use imageio to write MP4
# imageio.v3 API
# Use imageio to write MP4. We use h264_nvenc (NVIDIA HW encoder) instead
# of libx264 because the in-tree ffmpeg build is LGPL-only and libx264
# is GPL-licensed; see container/templates/wheel_builder.Dockerfile.
# Requires a CUDA-capable GPU at runtime.
if hasattr(iio, "imwrite"):
iio.imwrite(output_path, frames, fps=fps, codec="libx264")
iio.imwrite(output_path, frames, fps=fps, codec="h264_nvenc")
else:
# Fall back to v2 API
writer = iio.get_writer(output_path, fps=fps, codec="libx264") # type: ignore[attr-defined]
writer = iio.get_writer(output_path, fps=fps, codec="h264_nvenc") # type: ignore[attr-defined]
try:
for frame in frames:
writer.append_data(frame)
Expand Down Expand Up @@ -215,7 +245,7 @@ def encode_to_video_bytes(
if output_format == "webm":
kwargs["codec"] = "libvpx-vp9"
elif output_format == "mp4":
kwargs["codec"] = "libx264"
kwargs["codec"] = "h264_nvenc"
else:
raise ValueError(f"No codec specified for response format: {output_format}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def _generate_video(
return video_bytes

async def _frames_to_video(
self, frames: list, fps: int, codec: str = "libx264"
self, frames: list, fps: int, codec: str = "h264_nvenc"
) -> bytes:
"""Convert list of frames to video bytes.

Expand Down
15 changes: 15 additions & 0 deletions components/src/dynamo/vllm/omni/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,16 @@ def add_arguments(self, parser) -> None:
default=False,
help="Disable torch.compile and force eager execution for diffusion models.",
)
add_negatable_bool_argument(
g,
flag_name="--cosmos3-guardrails",
env_var="DYN_OMNI_COSMOS3_GUARDRAILS",
default=True,
help=(
"Enable Cosmos3 text/video safety guardrails (loads guardrail models "
"at startup). Use --no-cosmos3-guardrails to disable."
),
)

# TTS parameters
tts_g = parser.add_argument_group(
Expand Down Expand Up @@ -333,6 +343,11 @@ class OmniConfig(DynamoRuntimeConfig):
stage_configs_path: Optional[str] = None
default_video_fps: int = 16

# Cosmos3 safety guardrails. When False, routed into
# od_config.model_config["guardrails"]=False so the diffusion engine skips
# loading the guardrail models (see base_handler._build_omni_kwargs).
cosmos3_guardrails: bool = True

# Nested structs — each group of fields has a clear destination
diffusion: OmniDiffusionKwargs = dataclasses.field(
default_factory=OmniDiffusionKwargs
Expand Down
6 changes: 6 additions & 0 deletions components/src/dynamo/vllm/omni/base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ def _build_omni_kwargs(self, config) -> Dict[str, Any]:
if config.stage_configs_path:
omni_kwargs["stage_configs_path"] = config.stage_configs_path

# Cosmos3 guardrails toggle -> od_config.model_config["guardrails"].
# Mirrors vllm-omni serve's --cosmos3-no-guardrails; when disabled the
# diffusion engine skips loading the guardrail models entirely.
if not config.cosmos3_guardrails:
omni_kwargs["model_config"] = {"guardrails": False}

for field, value in dataclasses.asdict(config.diffusion).items():
if value is not None:
omni_kwargs[field] = value
Expand Down
7 changes: 5 additions & 2 deletions components/src/dynamo/vllm/omni/output_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
from dynamo.common.storage import upload_to_fs
from dynamo.common.utils.engine_response import normalize_finish_reason
from dynamo.common.utils.output_modalities import RequestType
from dynamo.common.utils.video_utils import normalize_video_frames
from dynamo.common.utils.video_utils import (
normalize_image_frames,
normalize_video_frames,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -241,7 +244,7 @@ async def _prepare_images(
self, images: list, request_id: str, response_format: Optional[str] = None
) -> list:
outlist = []
for img in images:
for img in normalize_image_frames(images):
buf = BytesIO()
img.save(buf, format="PNG")
image_bytes = buf.getvalue()
Expand Down
14 changes: 14 additions & 0 deletions components/src/dynamo/vllm/tests/omni/test_omni_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def _make_omni_config(**overrides) -> OmniConfig:
"tts_ref_audio_max_bytes": 50 * 1024 * 1024,
"stage_id": None,
"omni_router": False,
"cosmos3_guardrails": True,
}
flat_defaults.update(flat_overrides)

Expand Down Expand Up @@ -191,3 +192,16 @@ def test_omni_config_imports_cleanly():

assert OmniConfig is not None
assert callable(parse_omni_args)


# --- Cosmos3 guardrails ---


def test_omni_config_cosmos3_guardrails_default_enabled():
assert OmniConfig.cosmos3_guardrails is True


def test_omni_config_cosmos3_guardrails_overridable():
config = _make_omni_config(cosmos3_guardrails=False)
assert config.cosmos3_guardrails is False
config.validate() # disabling guardrails must not fail validation
22 changes: 22 additions & 0 deletions components/src/dynamo/vllm/tests/omni/test_omni_base_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,25 @@ def test_output_modalities_forwarded_to_async_omni(self):
kwargs = _build_kwargs(config)

assert kwargs["output_modalities"] == ["image"]


class TestCosmos3Guardrails:
"""`cosmos3_guardrails=False` should route into omni_kwargs as
``model_config={"guardrails": False}``; the default (True) leaves
model_config untouched so vllm-omni applies its own default."""

def test_disabled_routes_into_model_config(self):
config = _make_config()
config.cosmos3_guardrails = False

kwargs = _build_kwargs(config)

assert kwargs.get("model_config") == {"guardrails": False}

def test_enabled_does_not_set_model_config(self):
config = _make_config()
config.cosmos3_guardrails = True

kwargs = _build_kwargs(config)

assert "model_config" not in kwargs
39 changes: 17 additions & 22 deletions components/src/dynamo/vllm/tests/omni/test_output_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,39 @@ def _make_diffusion_formatter():
)


def _make_pil_image(size=(4, 4)):
# Use a real PIL image: normalize_image_frames() passes PIL inputs through
# unchanged, whereas a MagicMock falls into the np.asarray(item).max() path
# and raises "zero-size array to reduction operation maximum".
from PIL import Image

return Image.new("RGB", size, (123, 222, 64))


class TestDiffusionFormatterPrepareImages:
@pytest.mark.asyncio
async def test_b64_json(self):
f = _make_diffusion_formatter()
img = MagicMock()
img.save = lambda b, format: b.write(b"fake_png_data")
results = await f._prepare_images([img], "req-1", "b64_json")
results = await f._prepare_images([_make_pil_image()], "req-1", "b64_json")
assert len(results) == 1
assert results[0].startswith("data:image/png;base64,")

@pytest.mark.asyncio
async def test_b64_default_when_none(self):
f = _make_diffusion_formatter()
img = MagicMock()
img.save = lambda b, format: b.write(b"data")
results = await f._prepare_images([img], "req-1", None)
results = await f._prepare_images([_make_pil_image()], "req-1", None)
assert results[0].startswith("data:image/png;base64,")

@pytest.mark.asyncio
async def test_invalid_format(self):
f = _make_diffusion_formatter()
with pytest.raises(ValueError, match="Invalid response format"):
await f._prepare_images([MagicMock()], "req-1", "invalid")
await f._prepare_images([_make_pil_image()], "req-1", "invalid")

@pytest.mark.asyncio
async def test_multiple_images(self):
f = _make_diffusion_formatter()
imgs = [MagicMock() for _ in range(3)]
for img in imgs:
img.save = lambda b, format: b.write(b"px")
imgs = [_make_pil_image() for _ in range(3)]
results = await f._prepare_images(imgs, "req-1", "b64_json")
assert len(results) == 3

Expand All @@ -155,10 +158,8 @@ async def test_chat_completion_format(self):
from dynamo.common.utils.output_modalities import RequestType

f = _make_diffusion_formatter()
img = MagicMock()
img.save = lambda b, format: b.write(b"px")
chunk = await f._encode_image(
[img], "req-1", request_type=RequestType.CHAT_COMPLETION
[_make_pil_image()], "req-1", request_type=RequestType.CHAT_COMPLETION
)
assert chunk["object"] == "chat.completion.chunk"
assert chunk["choices"][0]["delta"]["content"][0]["type"] == "image_url"
Expand All @@ -168,10 +169,8 @@ async def test_image_generation_b64_format(self):
from dynamo.common.utils.output_modalities import RequestType

f = _make_diffusion_formatter()
img = MagicMock()
img.save = lambda b, format: b.write(b"px")
chunk = await f._encode_image(
[img],
[_make_pil_image()],
"req-1",
response_format="b64_json",
request_type=RequestType.IMAGE_GENERATION,
Expand All @@ -183,10 +182,8 @@ async def test_image_generation_default_format_returns_b64(self):
from dynamo.common.utils.output_modalities import RequestType

f = _make_diffusion_formatter()
img = MagicMock()
img.save = lambda b, format: b.write(b"px")
chunk = await f._encode_image(
[img],
[_make_pil_image()],
"req-1",
response_format=None,
request_type=RequestType.IMAGE_GENERATION,
Expand Down Expand Up @@ -381,9 +378,7 @@ async def test_routes_image(self):
f = OutputFormatter(model_name="test-model")
stage = MagicMock()
stage.final_output_type = "image"
img = MagicMock()
img.save = lambda b, format: b.write(b"px")
stage.images = [img]
stage.images = [_make_pil_image()]
chunk = await f.format(
stage, "req-1", request_type=RequestType.CHAT_COMPLETION, **self._FULL_CTX
)
Expand Down
Loading
Loading