Skip to content

Merge epd feature from Dr.zheng's repo into private repo#103

Closed
kechengliu97 wants to merge 1 commit intov0.11.0from
v0.11.0-myfix
Closed

Merge epd feature from Dr.zheng's repo into private repo#103
kechengliu97 wants to merge 1 commit intov0.11.0from
v0.11.0-myfix

Conversation

@kechengliu97
Copy link
Collaborator

@kechengliu97 kechengliu97 commented Oct 25, 2025

Disaggregated Encoder

A disaggregated encoder runs the vision-encoder stage of a multimodal LLM in a process that is separate from the prefill / decoder stage. Deploying these two stages in independent vLLM instances brings three practical benefits:

  1. Independent, fine-grained scaling
  2. Lower time-to-first-token (TTFT)
  3. Cross-process reuse and caching of encoder outputs

We received a series of feedback from the 21740 (Many thanks to @LastZhabka for the excellent work on the initial version), and based on this feedback, we updated our overall design logic.

Design doc: https://docs.google.com/document/d/1aed8KtC6XkXtdoV87pWT0a8OJlZ-CpnuLLzmR8l9BAE

@ywang96 @NickLucche @DarkLight1337 @WoosukKwon

1 New Encoder Cache Connector

Disaggregated encoding in v1 is enabled by an explicit EC connector abstraction. The ECConnectorBase defines a clear split of responsibilities between the scheduler-side and worker-side, and provides the lifecycle hooks required to check, load, save, and track encoder cache (EC) across processes.

ECConnector – interface for retrieving EC caches produced by the encoder.

  • Scheduler role – checks cache existence and schedules loads. (lives in the scheduler process):

    • check_caches_exist(request) -> list[bool]: probe whether EC exists for each multimodal datum in the request.
    • update_state_after_alloc(request, index): record EC loading intent once encoder cache space is allocated for a given mm datum.
    • build_connector_meta(scheduler_output) -> ECConnectorMetadata: build per-step metadata consumed by workers; also resets connector internal state for the next step.
    • request_finished(request) -> (bool, Optional[dict]): optionally indicate async save/send in progress and attach metadata.
    • update_connector_output(connector_output): ingest worker-side completion info.
  • Worker role – loads the embeddings into memory. (lives in each worker process):

    • bind_connector_metadata(meta) / clear_connector_metadata(): attach per-step metadata from the scheduler before/after forward.
    • start_load_caches(**kwargs): load required EC into the local encoder cache before embeddings are gathered.
    • save_caches(**kwargs): persist/transfer EC produced on the encoder.
    • wait_for_save(): block until outstanding saves complete (if any).
    • get_finished(finished_req_ids) -> (set[str]|None, set[str]|None): report completion of async send/recv to help scheduler bookkeeping.

2 Change highlights (Scheduler and GPUModelRunner)

  • Scheduler (vllm/v1/core/sched/scheduler.py):

    • Initializes an EC connector when ec_transfer_config is present via ECConnectorFactory.create_connector(..., role=SCHEDULER).
    • Tracks an encoder-token budget and skips EC work when cache already exists (local or remote as reported by the connector).
    • After allocation, calls ec_connector.update_state_after_alloc(...) so the next build_connector_meta(...) includes the mm hashes to load.
    • Emits ec_connector_metadata in SchedulerOutput for the workers, and ingests worker completion via ec_connector.update_connector_output(...).
  • Worker (vllm/v1/worker/gpu_model_runner.py):

    • Encoder-side (producer):

      • Within execute_model, when get_ec_transfer().is_producer is True, the runner enters with maybe_get_ec_connector_output(..., encoder_cache=self.encoder_cache): before running the multimodal encoder.
      • The encode pass computes embeddings and writes them into encoder_cache[mm_hash].
      • Immediately after finishing the encode for a given mm_hash, the runner calls maybe_save_ec_to_connector(self.encoder_cache, mm_hash) which invokes ECConnectorBase.save_caches(encoder_cache=..., mm_hash=...).
      • On context exit, wait_for_save() is invoked (if enabled) to ensure the persisted EC is durable/visible to consumers; get_finished(...) is queried to surface completion status back to the scheduler.
    • PD-side (consumer):

      • For requests scheduled on PD, the scheduler supplies ec_connector_metadata that lists the mm_hash items needing loads.
      • The runner binds this metadata and calls start_load_caches(encoder_cache=self.encoder_cache) prior to _gather_mm_embeddings, allowing the connector to populate encoder_cache[mm_hash] from the external store.
      • _gather_mm_embeddings then reads the loaded tensors from encoder_cache and returns them as multimodal embeddings for the subsequent decoder input embedding construction.
      • After the forward step, the runner clears metadata; any connector-furnished completion info is recorded into ECConnectorOutput for the scheduler to free resources when safe.

3 Usage Example

The current reference pathway is ECSharedStorageConnector. Below ready-to-run scripts show the workflows:

  • 1 Encoder + 1 PD:

    • examples/online_serving/disaggregated_encoder/shared_storage_connector/disagg_1e1pd_example.sh
    • (legacy/simple demo) examples/online_serving/disaggregated_encoder/shared_storage_connector/disagg_encoder_example.sh
  • 1 Encoder + 1 Prefill + 1 Decode:

    • examples/online_serving/disaggregated_encoder/shared_storage_connector/disagg_1e1p1d_example.sh

3.1 Minimal ECTransfer CLI config

The Encoder and PD share/transfer encoder cache (EC) via EC transfer. The current reference implementation is ECSharedStorageConnector (dump/load via a shared directory).

  • Encoder (producer) example:
vllm serve <model> \
  --ec-transfer-config '{
    "ec_connector": "ECSharedStorageConnector",
    "ec_role": "ec_producer",
    "ec_connector_extra_config": {"shared_storage_path": "/tmp"}
  }'
  • PD (consumer) example:
vllm serve <model> \
  --ec-transfer-config '{
    "ec_connector": "ECSharedStorageConnector",
    "ec_role": "ec_consumer",
    "ec_connector_extra_config": {"shared_storage_path": "/tmp"}
  }'

Notes:

  • If shared_storage_path is not explicitly set, /tmp is used by default.
  • This connector persists EC per mm_hash to /<shared_storage_path>/<mm_hash>/encoder_cache.safetensors, and the PD side loads it to GPU on demand.
  • Connector selection and loading are handled by ECConnectorFactory (with ECSharedStorageConnector registered by default).

4 Development

Here is a figure illustrating disaggregate encoder flow:

Disaggregated Encoder Flow

Disaggregated encoding is implemented by running two parts:

  • Encoder instance – a vLLM instance to performs vision encoding.

  • Prefill/Decode (PD) instance(s) – runs language pre-fill and decode.

    • PD can be a single instance (E->PD, see disagg_1e1pd_example.sh / disagg_encoder_example.sh), or disaggregated with Decode (E->P->D, see disagg_1e1p1d_example.sh).

A connector transfers encoder-cache (EC) embeddings from the encoder instance to the PD instance. All related code is under vllm/distributed/ec_transfer.

For the PD disaggregation part, the Prefill instance receives cache exactly the same as the disaggregate encoder flow above. Prefill instance executes 1 step (prefill -> 1 token output) and then transfers KV cache to the Decode instance for the remaining execution. The KV transfer part purely happens after the execution of the Prefill instance. docs/features/disagg_prefill.md shows the brief idea about the disaggregated prefill (v0). We create the example setup with the NixlConnector from vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py in vllm V1 and referred to the tests/v1/kv_connector/nixl_integration/toy_proxy_server.py to facilitate the kv transfer between P and D;

5. Next-Step Plan

5.1 Broaden EC Connector Types

5.2 Multi-Hardware Platform Support

5.3 Comprehensive Performance Evaluation

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: the disaggregated encoder. This allows running the vision encoder in a separate process from the prefill/decoder, which can lead to better resource utilization, scaling, and lower time-to-first-token. The implementation introduces a new ECConnector abstraction for managing the transfer of encoder cache between processes, with an initial ECSharedStorageConnector implementation. The changes are well-structured, with modifications to the scheduler and GPU model runner to integrate this new capability. The addition of comprehensive documentation, examples, and tests (both unit and integration) is commendable and greatly helps in understanding and verifying the feature.

My review has identified one high-severity issue related to a race condition in the ECSharedStorageConnector when used in a multi-producer setup, which could lead to data corruption. A suggestion to add file-based locking is provided to address this. Overall, this is a high-quality contribution.

Comment on lines +114 to +118
filename = self._generate_filename_debug(mm_hash)
ec_cache = encoder_cache[mm_hash]
tensors = {"ec_cache": ec_cache.detach().cpu()}
safetensors.torch.save_file(tensors, filename)
logger.debug("Save cache successful for mm_hash %s", mm_hash)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of save_caches is not safe for concurrent writes from multiple producer processes. If multiple encoder instances are configured to use the same shared_storage_path, they could race to write the cache for the same mm_hash, potentially leading to corrupted encoder_cache.safetensors files. The safetensors.torch.save_file operation is not guaranteed to be atomic across different processes.

To make this connector safe for a multi-producer setup, a file-based locking mechanism (e.g., using fcntl on Unix-like systems) should be implemented around the file writing operations. This ensures that only one process can write to a given cache file at a time.

Suggested change
filename = self._generate_filename_debug(mm_hash)
ec_cache = encoder_cache[mm_hash]
tensors = {"ec_cache": ec_cache.detach().cpu()}
safetensors.torch.save_file(tensors, filename)
logger.debug("Save cache successful for mm_hash %s", mm_hash)
import fcntl
import os
filename = self._generate_filename_debug(mm_hash)
lock_filename = filename + ".lock"
# Use a lock file to prevent race conditions from multiple producers.
with open(lock_filename, "w") as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
try:
# Double-check if another process created the cache while waiting for the lock.
if not os.path.exists(filename):
ec_cache = encoder_cache[mm_hash]
tensors = {"ec_cache": ec_cache.detach().cpu()}
safetensors.torch.save_file(tensors, filename)
logger.debug("Save cache successful for mm_hash %s", mm_hash)
else:
logger.debug("Cache for mm_hash %s already exists, skipping save.", mm_hash)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)

@zengchuang-hw zengchuang-hw deleted the v0.11.0-myfix branch October 30, 2025 07:44
cursor bot pushed a commit to Shirley125/vllm_epd that referenced this pull request Jan 22, 2026
Signed-off-by: WANG Cong <115451386+congw729@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant