diff --git a/vllm_omni/diffusion/model_loader/hub_prefetch.py b/vllm_omni/diffusion/model_loader/hub_prefetch.py index d60e1734e56..a33dca74d24 100644 --- a/vllm_omni/diffusion/model_loader/hub_prefetch.py +++ b/vllm_omni/diffusion/model_loader/hub_prefetch.py @@ -65,10 +65,22 @@ import logging import os import time -from collections.abc import Iterable, Iterator +from collections.abc import Callable, Iterable, Iterator +from typing import Any logger = logging.getLogger(__name__) +# A racy / partially-evicted HF cache (the exact failure this module defends +# against) is transient: re-running ``snapshot_download`` blocks on the peer +# writer's per-blob ``.lock`` and then returns a complete tree. So a bounded +# retry with linear backoff is what actually closes the window that a single +# best-effort attempt left open (Buildkite vllm-omni-rebase #1858: both the +# ``cuda_ti2v_hsdp`` missing-shard ``OSError`` and the ``wan_2_1_vace`` default +# ``UMT5Config`` size-mismatch were a swallowed prefetch followed by a +# ``from_pretrained`` against the half-written cache). +_PREFETCH_MAX_ATTEMPTS = 3 +_PREFETCH_BACKOFF_BASE_S = 1.0 + def _node_lock_dir() -> str: """Return a node-local directory suitable for the prefetch lock files. @@ -276,7 +288,11 @@ def prefetch_subfolders( if local_files_only or not model or os.path.isdir(model): return - logger.info("Prefetching %s subfolders: %s", model, list(subfolders)) + # Materialise ``subfolders`` up-front: it may be a one-shot generator and + # we reference it again in the retry / logging paths below. + subfolders = list(subfolders) + + logger.info("Prefetching %s subfolders: %s", model, subfolders) try: from huggingface_hub import snapshot_download @@ -306,39 +322,65 @@ def prefetch_subfolders( # snapshot is now warm. This is what makes the prefetch race-free # even when many ``DiffusionWorker`` subprocesses (or multiple # OmniServer instances on the same node) hit this code in parallel. - try: - with _repo_prefetch_lock(model): - snapshot_download( - repo_id=model, - allow_patterns=allow_patterns, - ) - logger.info("Prefetch complete for %s", model) - except Exception as exc: - # Best-effort: propagate only via logging. The subsequent - # ``from_pretrained`` call will raise a clearer, call-site-specific - # error (auth, 404, disk full, ...) that we'd rather surface - EXCEPT - # for auth/gating, which we escalate here with an explicit hint so - # readers of CI logs don't have to correlate the generic "OSError: - # does not appear to have a file named ..." that - # ``from_pretrained`` would otherwise emit much later with an - # unrelated-looking message. - if _looks_like_auth_error(exc): - logger.error( - "Hub prefetch for '%s' failed with an authentication / gated " - "repository error (%s: %s). The CI HF_TOKEN must (1) be set " - "in the step env, (2) be valid, and (3) belong to an account " - "that has accepted the model license on huggingface.co. See " - "docs/contributing/ci/hf_credentials.md.", - model, - type(exc).__name__, - exc, - ) - else: + # A single best-effort attempt is not enough: when several diffusion + # workers race a cold cache (and the node-wide lock fails to serialise + # them, as observed for HSDP / ring launches), ``snapshot_download`` can + # raise on a half-written tree. Swallowing that and proceeding straight + # to ``from_pretrained`` is exactly what turned a recoverable prefetch + # hiccup into a hard server crash. Retry with backoff so the snapshot + # actually completes before any loader reads the cache. + for attempt in range(1, _PREFETCH_MAX_ATTEMPTS + 1): + try: + with _repo_prefetch_lock(model): + snapshot_download( + repo_id=model, + allow_patterns=allow_patterns, + ) + logger.info("Prefetch complete for %s", model) + return + except Exception as exc: + # Auth / gating never heals on retry - escalate immediately with + # an explicit hint so readers of CI logs don't have to correlate + # the generic "OSError: does not appear to have a file + # named ..." that ``from_pretrained`` would otherwise emit later. + if _looks_like_auth_error(exc): + logger.error( + "Hub prefetch for '%s' failed with an authentication / gated " + "repository error (%s: %s). The CI HF_TOKEN must (1) be set " + "in the step env, (2) be valid, and (3) belong to an account " + "that has accepted the model license on huggingface.co. See " + "docs/contributing/ci/hf_credentials.md.", + model, + type(exc).__name__, + exc, + ) + return + + if attempt < _PREFETCH_MAX_ATTEMPTS: + backoff = _PREFETCH_BACKOFF_BASE_S * attempt + logger.warning( + "Hub prefetch for repo '%s' subfolders %s failed on attempt %d/%d (%s: %s); retrying in %.1fs", + model, + subfolders, + attempt, + _PREFETCH_MAX_ATTEMPTS, + type(exc).__name__, + exc, + backoff, + ) + time.sleep(backoff) + continue + + # Exhausted retries. Stay best-effort: propagate only via logging + # so the subsequent ``from_pretrained`` call surfaces the real, + # call-site-specific error (and ``from_pretrained_with_prefetch`` + # gets a final chance to heal the cache). logger.warning( - "Hub prefetch for repo '%s' subfolders %s failed (%s: %s); " - "falling back to on-demand download in from_pretrained", + "Hub prefetch for repo '%s' subfolders %s failed after %d attempts " + "(%s: %s); falling back to on-demand download in from_pretrained", model, - list(subfolders), + subfolders, + _PREFETCH_MAX_ATTEMPTS, type(exc).__name__, exc, ) @@ -376,6 +418,78 @@ def _looks_like_auth_error(exc: BaseException) -> bool: return "401 client error" in msg or "403 client error" in msg or "gatedrepo" in msg +def from_pretrained_with_prefetch( + factory: Callable[..., Any], + model: str, + *, + subfolder: str, + prefetch_list: Iterable[str], + local_files_only: bool = False, + max_attempts: int = _PREFETCH_MAX_ATTEMPTS, + **from_pretrained_kwargs: Any, +) -> Any: + """Call ``factory.from_pretrained`` healing a racy / partial HF cache. + + ``factory`` is a bound ``SomeModel.from_pretrained`` (or any callable with + the same ``(model, *, subfolder, local_files_only, **kwargs)`` signature). + + This is a stronger sibling of :func:`retry_on_missing_shard`: that helper + only retries the missing-shard ``OSError`` and never re-prefetches, so it + cannot recover the second face of the same race. Two shapes of partial + -cache failure crash the diffusion server outright: + + * ``OSError: does not appear to have a file named + text_encoder/model-0000X-of-0000Y.safetensors`` - a shard is still under + its ``.incomplete`` name. + * ``RuntimeError: You set 'ignore_mismatched_sizes' to 'False' ...`` - + ``text_encoder/config.json`` was not present yet, so ``transformers`` v5 + silently fell back to the default (tiny) config and then could not load + the real checkpoint into it. + + Both heal once the cache is complete. So on those errors we re-run a + *verified* prefetch (which blocks on the peer writer and retries the + download) and reload, instead of letting the worker die. Local paths and + ``local_files_only`` loads cannot be healed by re-fetching, so they raise + on the first failure exactly as before. + """ + prefetch_list = list(prefetch_list) + can_heal = not local_files_only and bool(model) and not os.path.isdir(model) + last_exc: BaseException | None = None + + for attempt in range(1, max_attempts + 1): + try: + return factory( + model, + subfolder=subfolder, + local_files_only=local_files_only, + **from_pretrained_kwargs, + ) + except (OSError, RuntimeError, ValueError) as exc: + last_exc = exc + if not can_heal or attempt >= max_attempts: + break + backoff = _PREFETCH_BACKOFF_BASE_S * attempt + logger.warning( + "from_pretrained(%s, subfolder=%s) failed on attempt %d/%d " + "(%s: %s); re-prefetching repo and retrying in %.1fs", + model, + subfolder, + attempt, + max_attempts, + type(exc).__name__, + exc, + backoff, + ) + time.sleep(backoff) + # Force a fresh, verified snapshot of every component this pipeline + # needs - not just ``subfolder`` - so a sibling component that was + # also half-written gets repaired in the same pass. + prefetch_subfolders(model, prefetch_list, local_files_only=False) + + assert last_exc is not None # loop only exits via return or a caught exc + raise last_exc + + def retry_on_missing_shard(load_fn, *, max_retries: int = 3, base_delay: float = 5.0): """Call *load_fn* with retry on the transformers v5 shard-resolution race. diff --git a/vllm_omni/diffusion/models/flux2_klein/pipeline_flux2_klein.py b/vllm_omni/diffusion/models/flux2_klein/pipeline_flux2_klein.py index 9cb81fd37d6..aea6d7e03fd 100644 --- a/vllm_omni/diffusion/models/flux2_klein/pipeline_flux2_klein.py +++ b/vllm_omni/diffusion/models/flux2_klein/pipeline_flux2_klein.py @@ -39,7 +39,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.flux2_klein.flux2_klein_transformer import ( Flux2Transformer2DModel, ) @@ -213,9 +213,10 @@ def __init__( # Avoid the transformers v5 multi-worker subfolder race (see # ``vllm_omni/diffusion/model_loader/hub_prefetch.py`` for the full # analysis; L4 build #1043 hit this on FLUX.2-klein-4B's text_encoder). + flux2_subfolders = ["scheduler", "text_encoder", "tokenizer", "vae"] prefetch_subfolders( model, - ["scheduler", "text_encoder", "tokenizer", "vae"], + flux2_subfolders, local_files_only=local_files_only, ) @@ -224,9 +225,15 @@ def __init__( subfolder="scheduler", local_files_only=local_files_only, ) - self.text_encoder = Qwen3ForCausalLM.from_pretrained( + # ``from_pretrained_with_prefetch`` re-prefetches and retries if a peer + # worker left the cache half-written (missing-shard ``OSError`` or the + # default-config size-mismatch ``RuntimeError``) instead of crashing + # the worker - FLUX.2-klein-4B's sharded text_encoder hit this on #1043. + self.text_encoder = from_pretrained_with_prefetch( + Qwen3ForCausalLM.from_pretrained, model, subfolder="text_encoder", + prefetch_list=flux2_subfolders, local_files_only=local_files_only, ).to(self._execution_device) self.tokenizer = Qwen2TokenizerFast.from_pretrained( @@ -234,9 +241,11 @@ def __init__( subfolder="tokenizer", local_files_only=local_files_only, ) - self.vae = AutoencoderKLFlux2.from_pretrained( + self.vae = from_pretrained_with_prefetch( + AutoencoderKLFlux2.from_pretrained, model, subfolder="vae", + prefetch_list=flux2_subfolders, local_files_only=local_files_only, ).to(self._execution_device) diff --git a/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_i1_image.py b/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_i1_image.py index 39869a00d54..abc50eb8a22 100644 --- a/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_i1_image.py +++ b/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_i1_image.py @@ -31,6 +31,7 @@ from vllm_omni.diffusion.distributed.parallel_state import get_classifier_free_guidance_world_size from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.hidream_image import HiDreamImageTransformer2DModel from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin from vllm_omni.diffusion.profiler.diffusion_pipeline_profiler import DiffusionPipelineProfilerMixin @@ -177,24 +178,55 @@ def __init__( # Check if model is a local path local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the in-repo component set before any from_pretrained + # (``text_encoder_4`` lives in a separate Llama repo and is unaffected). + hidream_subfolders = [ + "scheduler", + "vae", + "text_encoder", + "tokenizer", + "text_encoder_2", + "tokenizer_2", + "text_encoder_3", + "tokenizer_3", + ] + prefetch_subfolders(model, hidream_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) - self.text_encoder = CLIPTextModelWithProjection.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, + ).to(self.device) + self.text_encoder = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer = CLIPTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder_2 = CLIPTextModelWithProjection.from_pretrained( - model, subfolder="text_encoder_2", local_files_only=local_files_only + self.text_encoder_2 = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, + model, + subfolder="text_encoder_2", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer_2 = CLIPTokenizer.from_pretrained( model, subfolder="tokenizer_2", local_files_only=local_files_only ) - self.text_encoder_3 = T5EncoderModel.from_pretrained( - model, subfolder="text_encoder_3", local_files_only=local_files_only + self.text_encoder_3 = from_pretrained_with_prefetch( + T5EncoderModel.from_pretrained, + model, + subfolder="text_encoder_3", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer_3 = T5Tokenizer.from_pretrained( model, subfolder="tokenizer_3", local_files_only=local_files_only diff --git a/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_image.py b/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_image.py index 0c5486ae588..135e21e1a73 100644 --- a/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_image.py +++ b/vllm_omni/diffusion/models/hidream_image/pipeline_hidream_image.py @@ -31,6 +31,7 @@ from vllm_omni.diffusion.distributed.parallel_state import get_classifier_free_guidance_world_size from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.hidream_image import HiDreamImageTransformer2DModel from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin from vllm_omni.diffusion.profiler.diffusion_pipeline_profiler import DiffusionPipelineProfilerMixin @@ -177,24 +178,55 @@ def __init__( # Check if model is a local path local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the in-repo component set before any from_pretrained + # (``text_encoder_4`` lives in a separate Llama repo and is unaffected). + hidream_subfolders = [ + "scheduler", + "vae", + "text_encoder", + "tokenizer", + "text_encoder_2", + "tokenizer_2", + "text_encoder_3", + "tokenizer_3", + ] + prefetch_subfolders(model, hidream_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) - self.text_encoder = CLIPTextModelWithProjection.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, + ).to(self.device) + self.text_encoder = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer = CLIPTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder_2 = CLIPTextModelWithProjection.from_pretrained( - model, subfolder="text_encoder_2", local_files_only=local_files_only + self.text_encoder_2 = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, + model, + subfolder="text_encoder_2", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer_2 = CLIPTokenizer.from_pretrained( model, subfolder="tokenizer_2", local_files_only=local_files_only ) - self.text_encoder_3 = T5EncoderModel.from_pretrained( - model, subfolder="text_encoder_3", local_files_only=local_files_only + self.text_encoder_3 = from_pretrained_with_prefetch( + T5EncoderModel.from_pretrained, + model, + subfolder="text_encoder_3", + prefetch_list=hidream_subfolders, + local_files_only=local_files_only, ) self.tokenizer_3 = T5Tokenizer.from_pretrained( model, subfolder="tokenizer_3", local_files_only=local_files_only diff --git a/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5.py b/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5.py index 3688aa7c269..038898a16a8 100644 --- a/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5.py +++ b/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5.py @@ -25,6 +25,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.hunyuan_video.hunyuan_video_15_transformer import HunyuanVideo15Transformer3DModel from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin from vllm_omni.diffusion.models.t5_encoder import T5EncoderModel @@ -101,9 +102,20 @@ def __init__( model = od_config.model local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained so + # peer workers load from a warm, complete cache. + hv_subfolders = ["tokenizer", "text_encoder", "tokenizer_2", "text_encoder_2", "vae", "scheduler"] + prefetch_subfolders(model, hv_subfolders, local_files_only=local_files_only) + self.tokenizer = Qwen2Tokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder = Qwen2_5_VLTextModel.from_pretrained( - model, subfolder="text_encoder", torch_dtype=dtype, local_files_only=local_files_only + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLTextModel.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=hv_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) self.tokenizer_2 = ByT5Tokenizer.from_pretrained( @@ -112,8 +124,13 @@ def __init__( t5_config = AutoConfig.from_pretrained(model, subfolder="text_encoder_2", local_files_only=local_files_only) self.text_encoder_2 = T5EncoderModel(t5_config, prefix="text_encoder_2").to(dtype=dtype, device=self.device) - self.vae = DistributedAutoencoderKLHunyuanVideo15.from_pretrained( - model, subfolder="vae", torch_dtype=torch.float32, local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKLHunyuanVideo15.from_pretrained, + model, + subfolder="vae", + prefetch_list=hv_subfolders, + local_files_only=local_files_only, + torch_dtype=torch.float32, ).to(self.device) self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( diff --git a/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5_i2v.py b/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5_i2v.py index ad6339c8ce4..9eb29615009 100644 --- a/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5_i2v.py +++ b/vllm_omni/diffusion/models/hunyuan_video/pipeline_hunyuan_video_1_5_i2v.py @@ -32,6 +32,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.hunyuan_video.hunyuan_video_15_transformer import HunyuanVideo15Transformer3DModel from vllm_omni.diffusion.models.hunyuan_video.pipeline_hunyuan_video_1_5 import ( extract_glyph_texts, @@ -123,9 +124,29 @@ def __init__( model = od_config.model local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained so + # peer workers load from a warm, complete cache. + hv_subfolders = [ + "tokenizer", + "text_encoder", + "tokenizer_2", + "text_encoder_2", + "image_encoder", + "feature_extractor", + "vae", + "scheduler", + ] + prefetch_subfolders(model, hv_subfolders, local_files_only=local_files_only) + self.tokenizer = Qwen2Tokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder = Qwen2_5_VLTextModel.from_pretrained( - model, subfolder="text_encoder", torch_dtype=dtype, local_files_only=local_files_only + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLTextModel.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=hv_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) self.tokenizer_2 = ByT5Tokenizer.from_pretrained( @@ -134,15 +155,25 @@ def __init__( t5_config = AutoConfig.from_pretrained(model, subfolder="text_encoder_2", local_files_only=local_files_only) self.text_encoder_2 = T5EncoderModel(t5_config, prefix="text_encoder_2").to(dtype=dtype, device=self.device) - self.image_encoder = SiglipVisionModel.from_pretrained( - model, subfolder="image_encoder", torch_dtype=dtype, local_files_only=local_files_only + self.image_encoder = from_pretrained_with_prefetch( + SiglipVisionModel.from_pretrained, + model, + subfolder="image_encoder", + prefetch_list=hv_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) self.feature_extractor = SiglipImageProcessor.from_pretrained( model, subfolder="feature_extractor", local_files_only=local_files_only ) - self.vae = DistributedAutoencoderKLHunyuanVideo15.from_pretrained( - model, subfolder="vae", torch_dtype=torch.float32, local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKLHunyuanVideo15.from_pretrained, + model, + subfolder="vae", + prefetch_list=hv_subfolders, + local_files_only=local_files_only, + torch_dtype=torch.float32, ).to(self.device) self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( diff --git a/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image.py b/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image.py index 76d3efa2f85..ec942c25ff1 100644 --- a/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image.py +++ b/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image.py @@ -27,6 +27,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.longcat_image.longcat_image_transformer import LongCatImageTransformer2DModel from vllm_omni.diffusion.profiler.diffusion_pipeline_profiler import DiffusionPipelineProfilerMixin from vllm_omni.diffusion.request import OmniDiffusionRequest @@ -223,19 +224,32 @@ def __init__( model = od_config.model local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + longcat_subfolders = ["scheduler", "text_encoder", "tokenizer", "vae"] + prefetch_subfolders(model, longcat_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=longcat_subfolders, + local_files_only=local_files_only, ) self.text_processor = Qwen2VLProcessor.from_pretrained( model, subfolder="tokenizer", local_files_only=local_files_only ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=longcat_subfolders, + local_files_only=local_files_only, + ).to(self.device) self.transformer = LongCatImageTransformer2DModel(od_config=od_config) self.tokenizer = AutoTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) diff --git a/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image_edit.py b/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image_edit.py index 7eccf68636a..4bfa00a976d 100644 --- a/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image_edit.py +++ b/vllm_omni/diffusion/models/longcat_image/pipeline_longcat_image_edit.py @@ -29,6 +29,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.interface import SupportImageInput from vllm_omni.diffusion.models.longcat_image.longcat_image_transformer import ( LongCatImageTransformer2DModel, @@ -244,19 +245,32 @@ def __init__( model = od_config.model local_files_only = os.path.exists(model) + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + longcat_subfolders = ["scheduler", "text_encoder", "text_processor", "tokenizer", "vae"] + prefetch_subfolders(model, longcat_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=longcat_subfolders, + local_files_only=local_files_only, ) self.text_processor = Qwen2VLProcessor.from_pretrained( model, subfolder="text_processor", local_files_only=local_files_only ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=longcat_subfolders, + local_files_only=local_files_only, + ).to(self.device) self.transformer = LongCatImageTransformer2DModel(od_config=od_config) self.tokenizer = AutoTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) diff --git a/vllm_omni/diffusion/models/ltx2/pipeline_ltx2.py b/vllm_omni/diffusion/models/ltx2/pipeline_ltx2.py index 82f59e7ff63..a409776d29d 100644 --- a/vllm_omni/diffusion/models/ltx2/pipeline_ltx2.py +++ b/vllm_omni/diffusion/models/ltx2/pipeline_ltx2.py @@ -36,6 +36,7 @@ from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.lora.manager import DiffusionLoRAManager from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.dmd2 import DMD2PipelineMixin from vllm_omni.diffusion.models.interface import SupportsComponentDiscovery from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin @@ -185,6 +186,19 @@ def __init__( ), ] + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + ltx2_subfolders = [ + "tokenizer", + "text_encoder", + "connectors", + "vae", + "audio_vae", + "vocoder", + "scheduler", + ] + prefetch_subfolders(model, ltx2_subfolders, local_files_only=local_files_only) + self.tokenizer = AutoTokenizer.from_pretrained( model, subfolder="tokenizer", @@ -193,36 +207,46 @@ def __init__( # prefer mmap loading as default device is cuda, and the output of text encoder # could be deterministic. with torch.device("cpu"): - self.text_encoder = Gemma3ForConditionalGeneration.from_pretrained( + self.text_encoder = from_pretrained_with_prefetch( + Gemma3ForConditionalGeneration.from_pretrained, model, subfolder="text_encoder", - torch_dtype=dtype, + prefetch_list=ltx2_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) - self.connectors = LTX2TextConnectors.from_pretrained( + self.connectors = from_pretrained_with_prefetch( + LTX2TextConnectors.from_pretrained, model, subfolder="connectors", - torch_dtype=dtype, + prefetch_list=ltx2_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) - self.vae = AutoencoderKLLTX2Video.from_pretrained( + self.vae = from_pretrained_with_prefetch( + AutoencoderKLLTX2Video.from_pretrained, model, subfolder="vae", - torch_dtype=dtype, + prefetch_list=ltx2_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) - self.audio_vae = AutoencoderKLLTX2Audio.from_pretrained( + self.audio_vae = from_pretrained_with_prefetch( + AutoencoderKLLTX2Audio.from_pretrained, model, subfolder="audio_vae", - torch_dtype=dtype, + prefetch_list=ltx2_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) - self.vocoder = LTX2Vocoder.from_pretrained( + self.vocoder = from_pretrained_with_prefetch( + LTX2Vocoder.from_pretrained, model, subfolder="vocoder", - torch_dtype=dtype, + prefetch_list=ltx2_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) transformer_config = load_transformer_config(model, "transformer", local_files_only) diff --git a/vllm_omni/diffusion/models/ltx2/pipeline_ltx2_3.py b/vllm_omni/diffusion/models/ltx2/pipeline_ltx2_3.py index e6985e18623..d28ff69d985 100644 --- a/vllm_omni/diffusion/models/ltx2/pipeline_ltx2_3.py +++ b/vllm_omni/diffusion/models/ltx2/pipeline_ltx2_3.py @@ -45,6 +45,7 @@ ) from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin from vllm_omni.diffusion.request import OmniDiffusionRequest @@ -149,26 +150,59 @@ def __init__( ), ] + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + ltx2_subfolders = [ + "tokenizer", + "text_encoder", + "connectors", + "vae", + "audio_vae", + "vocoder", + "scheduler", + ] + prefetch_subfolders(model, ltx2_subfolders, local_files_only=local_files_only) + # --- Tokenizer (lightweight, stays wherever) --- self.tokenizer = AutoTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) # --- Text encoder: load on CPU, move to GPU only during encoding --- with torch.device("cpu"): - self.text_encoder = Gemma3ForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", torch_dtype=dtype, local_files_only=local_files_only + self.text_encoder = from_pretrained_with_prefetch( + Gemma3ForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=ltx2_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ) # --- Connectors: CPU (LTX-2.3 connectors include caption projection) --- - self.connectors = LTX2TextConnectors.from_pretrained( - model, subfolder="connectors", torch_dtype=dtype, local_files_only=local_files_only + self.connectors = from_pretrained_with_prefetch( + LTX2TextConnectors.from_pretrained, + model, + subfolder="connectors", + prefetch_list=ltx2_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ) # --- VAE, Audio VAE: CPU --- - self.vae = AutoencoderKLLTX2Video.from_pretrained( - model, subfolder="vae", torch_dtype=dtype, local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + AutoencoderKLLTX2Video.from_pretrained, + model, + subfolder="vae", + prefetch_list=ltx2_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ) - self.audio_vae = AutoencoderKLLTX2Audio.from_pretrained( - model, subfolder="audio_vae", torch_dtype=dtype, local_files_only=local_files_only + self.audio_vae = from_pretrained_with_prefetch( + AutoencoderKLLTX2Audio.from_pretrained, + model, + subfolder="audio_vae", + prefetch_list=ltx2_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ) # --- Vocoder: prefer BWE vocoder (48kHz) for LTX-2.3 --- diff --git a/vllm_omni/diffusion/models/omnigen2/pipeline_omnigen2.py b/vllm_omni/diffusion/models/omnigen2/pipeline_omnigen2.py index 91af3263fbc..83ea7ed7363 100644 --- a/vllm_omni/diffusion/models/omnigen2/pipeline_omnigen2.py +++ b/vllm_omni/diffusion/models/omnigen2/pipeline_omnigen2.py @@ -32,6 +32,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.interface import SupportsComponentDiscovery from vllm_omni.diffusion.models.omnigen2.omnigen2_transformer import ( OmniGen2RotaryPosEmbed, @@ -673,23 +674,40 @@ def __init__( ) ] + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + omnigen2_subfolders = ["scheduler", "vae", "mllm", "processor"] + prefetch_subfolders(model, omnigen2_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=omnigen2_subfolders, + local_files_only=local_files_only, + ).to(self.device) transformer_kwargs = get_transformer_config_kwargs(od_config.tf_model_config, OmniGen2Transformer2DModel) self.transformer = OmniGen2Transformer2DModel( **transformer_kwargs, quant_config=od_config.quantization_config, ) - self.mllm = Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="mllm", local_files_only=local_files_only + self.mllm = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="mllm", + prefetch_list=omnigen2_subfolders, + local_files_only=local_files_only, ).to(self.device) - self.processor = Qwen2_5_VLProcessor.from_pretrained( - model, subfolder="processor", local_files_only=local_files_only + self.processor = from_pretrained_with_prefetch( + Qwen2_5_VLProcessor.from_pretrained, + model, + subfolder="processor", + prefetch_list=omnigen2_subfolders, + local_files_only=local_files_only, ) self.vae_scale_factor = ( 2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8 diff --git a/vllm_omni/diffusion/models/ovis_image/pipeline_ovis_image.py b/vllm_omni/diffusion/models/ovis_image/pipeline_ovis_image.py index 2484f02ed44..f77c5d419da 100644 --- a/vllm_omni/diffusion/models/ovis_image/pipeline_ovis_image.py +++ b/vllm_omni/diffusion/models/ovis_image/pipeline_ovis_image.py @@ -38,6 +38,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.ovis_image.ovis_image_transformer import OvisImageTransformer2DModel from vllm_omni.diffusion.profiler.diffusion_pipeline_profiler import DiffusionPipelineProfilerMixin from vllm_omni.diffusion.request import OmniDiffusionRequest @@ -163,20 +164,32 @@ def __init__( self._execution_device = get_local_device() model = od_config.model local_files_only = os.path.exists(model) + + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + ovis_subfolders = ["scheduler", "text_encoder", "vae", "tokenizer"] + prefetch_subfolders(model, ovis_subfolders, local_files_only=local_files_only) + self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = Qwen3Model.from_pretrained( + self.text_encoder = from_pretrained_with_prefetch( + Qwen3Model.from_pretrained, model, subfolder="text_encoder", + prefetch_list=ovis_subfolders, local_files_only=local_files_only, torch_dtype=od_config.dtype, ) - self.vae = AutoencoderKL.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self._execution_device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKL.from_pretrained, + model, + subfolder="vae", + prefetch_list=ovis_subfolders, + local_files_only=local_files_only, + ).to(self._execution_device) self.tokenizer = Qwen2TokenizerFast.from_pretrained( model, subfolder="tokenizer", local_files_only=local_files_only diff --git a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image.py b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image.py index 61e50b06149..cdc6de99840 100644 --- a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image.py +++ b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image.py @@ -26,7 +26,7 @@ from vllm_omni.diffusion.distributed.autoencoders.autoencoder_kl_qwenimage import DistributedAutoencoderKLQwenImage from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders, retry_on_missing_shard +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.dmd2 import DMD2PipelineMixin from vllm_omni.diffusion.models.qwen_image.cfg_parallel import ( QwenImageCFGParallelMixin, @@ -279,18 +279,25 @@ def __init__( # See pipeline_qwen_image_edit_plus: guard against transformers v5 # multi-worker race on partial subfolder shard sets (Buildkite #1043). + qwen_subfolders = ["scheduler", "text_encoder", "vae", "tokenizer"] prefetch_subfolders( model, - ["scheduler", "text_encoder", "vae", "tokenizer"], + qwen_subfolders, ) self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = retry_on_missing_shard( - lambda: Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only - ) + # ``from_pretrained_with_prefetch`` re-prefetches and retries on a + # half-written cache (missing-shard ``OSError`` *and* the default + # -config size-mismatch ``RuntimeError`` that ``retry_on_missing_shard`` + # could not recover) instead of crashing the worker. + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, ) # Qwen2.5-VL ships a vision tower that text-to-image does not use. # Drop it while the model is still on CPU, before moving to GPU, so @@ -307,8 +314,12 @@ def __init__( else: logger.warning("Qwen-Image: vision tower not found on text encoder; skipping drop") self.text_encoder = self.text_encoder.to(self.device) - self.vae = DistributedAutoencoderKLQwenImage.from_pretrained( - model, subfolder="vae", local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKLQwenImage.from_pretrained, + model, + subfolder="vae", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, ).to(self.device) transformer_kwargs = get_transformer_config_kwargs(od_config.tf_model_config, QwenImageTransformer2DModel) self.transformer = QwenImageTransformer2DModel( diff --git a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit.py b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit.py index c29fcb8ba33..35022e5eb2c 100644 --- a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit.py +++ b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit.py @@ -27,7 +27,7 @@ from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders, retry_on_missing_shard +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.interface import SupportImageInput from vllm_omni.diffusion.models.qwen_image.cfg_parallel import ( QwenImageCFGParallelMixin, @@ -247,28 +247,43 @@ def __init__( # See pipeline_qwen_image_edit_plus: guard against transformers v5 # multi-worker race on partial subfolder shard sets (Buildkite #1043). + qwen_subfolders = ["scheduler", "text_encoder", "vae", "tokenizer", "processor"] prefetch_subfolders( model, - ["scheduler", "text_encoder", "vae", "tokenizer", "processor"], + qwen_subfolders, ) self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = retry_on_missing_shard( - lambda: Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only - ).to(self.device) - ) + # ``from_pretrained_with_prefetch`` re-prefetches and retries on a + # half-written cache (missing-shard ``OSError`` *and* the default + # -config size-mismatch ``RuntimeError`` that ``retry_on_missing_shard`` + # could not recover) instead of crashing the worker. + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) - self.vae = AutoencoderKLQwenImage.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKLQwenImage.from_pretrained, + model, + subfolder="vae", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) transformer_kwargs = get_transformer_config_kwargs(od_config.tf_model_config, QwenImageTransformer2DModel) self.transformer = QwenImageTransformer2DModel(od_config=od_config, **transformer_kwargs) self.tokenizer = Qwen2Tokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.processor = retry_on_missing_shard( - lambda: Qwen2VLProcessor.from_pretrained(model, subfolder="processor", local_files_only=local_files_only) + self.processor = from_pretrained_with_prefetch( + Qwen2VLProcessor.from_pretrained, + model, + subfolder="processor", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, ) self.stage = None diff --git a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit_plus.py b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit_plus.py index ecdddc91fc3..e053f85dc10 100644 --- a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit_plus.py +++ b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_edit_plus.py @@ -25,7 +25,7 @@ from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders, retry_on_missing_shard +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.model_metadata import QWEN_IMAGE_EDIT_PLUS_MAX_INPUT_IMAGES from vllm_omni.diffusion.models.interface import SupportImageInput from vllm_omni.diffusion.models.qwen_image.cfg_parallel import ( @@ -216,29 +216,44 @@ def __init__( # #1043 for Qwen/Qwen-Image-Edit-2509). snapshot_download takes a # per-repo file lock so the first worker downloads and the rest # wait on a warm cache before loading. + qwen_subfolders = ["scheduler", "text_encoder", "vae", "tokenizer", "processor"] prefetch_subfolders( model, - ["scheduler", "text_encoder", "vae", "tokenizer", "processor"], + qwen_subfolders, ) self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = retry_on_missing_shard( - lambda: Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only - ).to(self.device) - ) + # ``from_pretrained_with_prefetch`` re-prefetches and retries on a + # half-written cache (missing-shard ``OSError`` *and* the default + # -config size-mismatch ``RuntimeError`` that ``retry_on_missing_shard`` + # could not recover) instead of crashing the worker. + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) - self.vae = AutoencoderKLQwenImage.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + self.vae = from_pretrained_with_prefetch( + AutoencoderKLQwenImage.from_pretrained, + model, + subfolder="vae", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) transformer_kwargs = get_transformer_config_kwargs(od_config.tf_model_config, QwenImageTransformer2DModel) self.transformer = QwenImageTransformer2DModel(od_config=od_config, **transformer_kwargs) self.tokenizer = Qwen2Tokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.processor = retry_on_missing_shard( - lambda: Qwen2VLProcessor.from_pretrained(model, subfolder="processor", local_files_only=local_files_only) + self.processor = from_pretrained_with_prefetch( + Qwen2VLProcessor.from_pretrained, + model, + subfolder="processor", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, ) self.stage = None diff --git a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py index df44404e813..a1fcff60d40 100644 --- a/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py +++ b/vllm_omni/diffusion/models/qwen_image/pipeline_qwen_image_layered.py @@ -24,7 +24,7 @@ from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders, retry_on_missing_shard +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.interface import SupportImageInput from vllm_omni.diffusion.models.qwen_image.autoencoder_kl_qwenimage import ( AutoencoderKLQwenImage, @@ -220,26 +220,41 @@ def __init__( # See pipeline_qwen_image_edit_plus: guard against transformers v5 # multi-worker race on partial subfolder shard sets (Buildkite #1043). + qwen_subfolders = ["scheduler", "text_encoder", "vae", "tokenizer", "processor"] prefetch_subfolders( model, - ["scheduler", "text_encoder", "vae", "tokenizer", "processor"], + qwen_subfolders, ) # modules keep same as transformers & diffusers self.scheduler = FlowMatchEulerDiscreteScheduler.from_pretrained( model, subfolder="scheduler", local_files_only=local_files_only ) - self.text_encoder = retry_on_missing_shard( - lambda: Qwen2_5_VLForConditionalGeneration.from_pretrained( - model, subfolder="text_encoder", local_files_only=local_files_only - ).to(self.device) - ) - self.vae = AutoencoderKLQwenImage.from_pretrained(model, subfolder="vae", local_files_only=local_files_only).to( - self.device - ) + # ``from_pretrained_with_prefetch`` re-prefetches and retries on a + # half-written cache (missing-shard ``OSError`` *and* the default + # -config size-mismatch ``RuntimeError`` that ``retry_on_missing_shard`` + # could not recover) instead of crashing the worker. + self.text_encoder = from_pretrained_with_prefetch( + Qwen2_5_VLForConditionalGeneration.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) + self.vae = from_pretrained_with_prefetch( + AutoencoderKLQwenImage.from_pretrained, + model, + subfolder="vae", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, + ).to(self.device) self.tokenizer = Qwen2Tokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.processor = retry_on_missing_shard( - lambda: Qwen2VLProcessor.from_pretrained(model, subfolder="processor", local_files_only=local_files_only) + self.processor = from_pretrained_with_prefetch( + Qwen2VLProcessor.from_pretrained, + model, + subfolder="processor", + prefetch_list=qwen_subfolders, + local_files_only=local_files_only, ) # modules re-implemented for vLLM-Omni diff --git a/vllm_omni/diffusion/models/sd3/pipeline_sd3.py b/vllm_omni/diffusion/models/sd3/pipeline_sd3.py index 22bb8ff0527..7eeffdde4d4 100644 --- a/vllm_omni/diffusion/models/sd3/pipeline_sd3.py +++ b/vllm_omni/diffusion/models/sd3/pipeline_sd3.py @@ -19,7 +19,7 @@ from vllm_omni.diffusion.distributed.cfg_parallel import CFGParallelMixin from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.sd3.sd3_transformer import ( SD3Transformer2DModel, ) @@ -156,18 +156,19 @@ def __init__( # See ``hub_prefetch.py`` for the transformers v5 subfolder race. # SD3.5 loads six subfolders in a row, each with multi-shard # safetensors - it is the worst-case fan-out for the race window. + sd3_subfolders = [ + "scheduler", + "tokenizer", + "tokenizer_2", + "tokenizer_3", + "text_encoder", + "text_encoder_2", + "text_encoder_3", + "vae", + ] prefetch_subfolders( model, - [ - "scheduler", - "tokenizer", - "tokenizer_2", - "tokenizer_3", - "text_encoder", - "text_encoder_2", - "text_encoder_3", - "vae", - ], + sd3_subfolders, local_files_only=local_files_only, ) @@ -198,31 +199,43 @@ def __init__( self.tokenizer_3 = T5Tokenizer.from_pretrained( model, subfolder="tokenizer_3", local_files_only=local_files_only ) - self.text_encoder = CLIPTextModelWithProjection.from_pretrained( + # ``from_pretrained_with_prefetch`` re-prefetches and retries if a + # peer worker left the cache half-written (missing-shard ``OSError`` or + # the default-config size-mismatch ``RuntimeError``) instead of + # crashing the worker - critical here given the six-subfolder fan-out. + self.text_encoder = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, model, subfolder="text_encoder", - torch_dtype=dtype, + prefetch_list=sd3_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ) - self.text_encoder_2 = CLIPTextModelWithProjection.from_pretrained( + self.text_encoder_2 = from_pretrained_with_prefetch( + CLIPTextModelWithProjection.from_pretrained, model, subfolder="text_encoder_2", - torch_dtype=dtype, + prefetch_list=sd3_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ) - self.text_encoder_3 = T5EncoderModel.from_pretrained( + self.text_encoder_3 = from_pretrained_with_prefetch( + T5EncoderModel.from_pretrained, model, subfolder="text_encoder_3", - torch_dtype=dtype, + prefetch_list=sd3_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ) self.transformer = SD3Transformer2DModel(od_config=od_config) - self.vae = DistributedAutoencoderKL.from_pretrained( + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKL.from_pretrained, model, subfolder="vae", - torch_dtype=dtype, + prefetch_list=sd3_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) if getattr(self, "vae", None) else 8 diff --git a/vllm_omni/diffusion/models/stable_audio/pipeline_stable_audio.py b/vllm_omni/diffusion/models/stable_audio/pipeline_stable_audio.py index 7816e49217a..f59215e0c23 100644 --- a/vllm_omni/diffusion/models/stable_audio/pipeline_stable_audio.py +++ b/vllm_omni/diffusion/models/stable_audio/pipeline_stable_audio.py @@ -28,6 +28,7 @@ from vllm_omni.diffusion.data import DiffusionOutput, OmniDiffusionConfig from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.interface import SupportAudioOutput from vllm_omni.diffusion.models.stable_audio.stable_audio_transformer import ( StableAudioDiTModel, @@ -109,6 +110,11 @@ def __init__( ), ] + # See ``hub_prefetch.py`` for the transformers v5 multi-worker subfolder + # race; prefetch the whole component set before any from_pretrained. + sa_subfolders = ["tokenizer", "text_encoder", "vae", "projection_model", "scheduler"] + prefetch_subfolders(model, sa_subfolders, local_files_only=local_files_only) + # Load tokenizer self.tokenizer = T5TokenizerFast.from_pretrained( model, @@ -117,27 +123,33 @@ def __init__( ) # Load text encoder - self.text_encoder = T5EncoderModel.from_pretrained( + self.text_encoder = from_pretrained_with_prefetch( + T5EncoderModel.from_pretrained, model, subfolder="text_encoder", - torch_dtype=dtype, + prefetch_list=sa_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) # Load VAE (AutoencoderOobleck for audio) - self.vae = AutoencoderOobleck.from_pretrained( + self.vae = from_pretrained_with_prefetch( + AutoencoderOobleck.from_pretrained, model, subfolder="vae", - torch_dtype=torch.float32, + prefetch_list=sa_subfolders, local_files_only=local_files_only, + torch_dtype=torch.float32, ).to(self.device) # Load projection model (using diffusers implementation) - self.projection_model = StableAudioProjectionModel.from_pretrained( + self.projection_model = from_pretrained_with_prefetch( + StableAudioProjectionModel.from_pretrained, model, subfolder="projection_model", - torch_dtype=dtype, + prefetch_list=sa_subfolders, local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) # Initialize transformer from HF config to keep architecture aligned with checkpoint. diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py index 2d8c752a4eb..85a58596050 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2.py @@ -26,7 +26,7 @@ from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.forward_context import set_forward_context_denoise_step_idx from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.dmd2 import DMD2PipelineMixin from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero from vllm_omni.diffusion.models.schedulers import FlowUniPCMultistepScheduler @@ -336,18 +336,39 @@ def __init__( ) # See ``hub_prefetch.py`` for the transformers v5 subfolder race. + component_subfolders = ["tokenizer", "text_encoder", "vae"] prefetch_subfolders( model, - ["tokenizer", "text_encoder", "vae"], + component_subfolders, local_files_only=local_files_only, ) - self.tokenizer = AutoTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder = UMT5EncoderModel.from_pretrained( - model, subfolder="text_encoder", torch_dtype=dtype, local_files_only=local_files_only + # ``from_pretrained_with_prefetch`` re-prefetches and retries if the + # cache is still half-written (the missing-shard ``OSError`` and the + # default-``UMT5Config`` size-mismatch ``RuntimeError`` seen on multi + # -worker HSDP / ring launches), instead of crashing the worker. + self.tokenizer = from_pretrained_with_prefetch( + AutoTokenizer.from_pretrained, + model, + subfolder="tokenizer", + prefetch_list=component_subfolders, + local_files_only=local_files_only, + ) + self.text_encoder = from_pretrained_with_prefetch( + UMT5EncoderModel.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=component_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) - self.vae = DistributedAutoencoderKLWan.from_pretrained( - model, subfolder="vae", torch_dtype=dtype, local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKLWan.from_pretrained, + model, + subfolder="vae", + prefetch_list=component_subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) # Initialize transformers with correct config (weights loaded via load_weights) diff --git a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py index 5ff3742051f..68eb3b11bc0 100644 --- a/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py +++ b/vllm_omni/diffusion/models/wan2_2/pipeline_wan2_2_i2v.py @@ -26,7 +26,7 @@ from vllm_omni.diffusion.distributed.utils import get_local_device from vllm_omni.diffusion.forward_context import set_forward_context_denoise_step_idx from vllm_omni.diffusion.model_loader.diffusers_loader import DiffusersPipelineLoader -from vllm_omni.diffusion.model_loader.hub_prefetch import prefetch_subfolders +from vllm_omni.diffusion.model_loader.hub_prefetch import from_pretrained_with_prefetch, prefetch_subfolders from vllm_omni.diffusion.models.dmd2 import DMD2PipelineMixin from vllm_omni.diffusion.models.interface import SupportImageInput from vllm_omni.diffusion.models.progress_bar import ProgressBarMixin, _is_rank_zero @@ -197,35 +197,64 @@ def __init__( ) ) - # Text encoder - self.tokenizer = AutoTokenizer.from_pretrained(model, subfolder="tokenizer", local_files_only=local_files_only) - self.text_encoder = UMT5EncoderModel.from_pretrained( - model, subfolder="text_encoder", torch_dtype=dtype, local_files_only=local_files_only - ).to(self.device) - # Image encoder (CLIP) - optional, for Wan2.1-style I2V self.has_image_encoder = "image_encoder" in model_index and model_index["image_encoder"][0] is not None - # See ``hub_prefetch.py`` for the transformers v5 subfolder race. + # See ``hub_prefetch.py`` for the transformers v5 subfolder race. The + # prefetch MUST run before any ``from_pretrained`` below - previously + # the tokenizer / text_encoder were loaded ahead of this call, so they + # never benefited from the prefetch and could hit the half-written + # cache directly. subfolders = ["tokenizer", "text_encoder", "vae"] if self.has_image_encoder: subfolders.extend(["image_processor", "image_encoder"]) prefetch_subfolders(model, subfolders, local_files_only=local_files_only) + # Text encoder + self.tokenizer = from_pretrained_with_prefetch( + AutoTokenizer.from_pretrained, + model, + subfolder="tokenizer", + prefetch_list=subfolders, + local_files_only=local_files_only, + ) + self.text_encoder = from_pretrained_with_prefetch( + UMT5EncoderModel.from_pretrained, + model, + subfolder="text_encoder", + prefetch_list=subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, + ).to(self.device) + if self.has_image_encoder: - self.image_processor = CLIPImageProcessor.from_pretrained( - model, subfolder="image_processor", local_files_only=local_files_only + self.image_processor = from_pretrained_with_prefetch( + CLIPImageProcessor.from_pretrained, + model, + subfolder="image_processor", + prefetch_list=subfolders, + local_files_only=local_files_only, ) - self.image_encoder = CLIPVisionModel.from_pretrained( - model, subfolder="image_encoder", torch_dtype=dtype, local_files_only=local_files_only + self.image_encoder = from_pretrained_with_prefetch( + CLIPVisionModel.from_pretrained, + model, + subfolder="image_encoder", + prefetch_list=subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) else: self.image_processor = None self.image_encoder = None # VAE - self.vae = DistributedAutoencoderKLWan.from_pretrained( - model, subfolder="vae", torch_dtype=dtype, local_files_only=local_files_only + self.vae = from_pretrained_with_prefetch( + DistributedAutoencoderKLWan.from_pretrained, + model, + subfolder="vae", + prefetch_list=subfolders, + local_files_only=local_files_only, + torch_dtype=dtype, ).to(self.device) # Transformers (weights loaded via load_weights)