Skip to content

Picking up session-based streaming input from vLLM#986

Closed
vklimkov-nvidia wants to merge 5 commits into
vllm-project:mainfrom
vklimkov-nvidia:vklimkov/alm_api_v4
Closed

Picking up session-based streaming input from vLLM#986
vklimkov-nvidia wants to merge 5 commits into
vllm-project:mainfrom
vklimkov-nvidia:vklimkov/alm_api_v4

Conversation

@vklimkov-nvidia
Copy link
Copy Markdown

Purpose

Introduce streaming input support.

Streaming input was recently introduced to vLLM main: vllm-project/vllm#28973.
It is an important feature to have for real-time processing, for example for dialogue agents, simultaneous translation systems. Feature adds StreamingInput as alternative input to generate method. PR mainly takes care of updating vllm-omni abstractions to be up-to-date with latest developments of vllm.

Test Plan

Check that doesn't affect any current tests.
Introduce a simple e2e test for streaming input

Test Result

In progress


Essential Elements of an Effective PR Description Checklist
  • [ x] The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results

Signed-off-by: Viacheslav Klimkov <vklimkov@nvidia.com>
Signed-off-by: Viacheslav Klimkov <vklimkov@nvidia.com>
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: eb736f8e78

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +352 to +354
# For streaming input, mark output as not finished while streaming
if isinstance(req_state, OmniRequestState) and req_state.streaming_input:
ro.finished = False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid marking final streaming outputs unfinished

The new streaming-input path forces ro.finished = False whenever req_state.streaming_input is true, but the cleanup branch for a finished request (lines 372–377) only clears input_chunk_queue and never flips streaming_input or calls _finish_request when there are no more chunks. This means the last chunk in a streaming-input session is still emitted as unfinished and the request state remains active, so non‑streaming OpenAI chat handling drops the output entirely (serving_chat.py filters out finished == False at lines 1336–1337). The result is a missing final response and a request-state leak for streaming-input sessions that end without additional queued chunks.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the concern is not valid. this change mirrors the one in vllm: this

While session is active - every output has finished=False, because additional input might appear from the user. Actual final signal comes through STREAM_FINISHED being pushedto the queue when generator closes.

streaming input is an internal API that one can use via engine.generate(). It is not exposed through completions.

There is no request state leak. _update_streaming_request_state handles finalization.

@vklimkov-nvidia
Copy link
Copy Markdown
Author

Streaming input does not work for prompt embeddings. Here is example of iteratively providing input tokens for text llama. we basically replace what was decoded with a different token and proceed with decoding step.

from vllm.usage.usage_lib import UsageContext

from vllm_omni.engine.arg_utils import AsyncOmniEngineArgs
from vllm_omni.entrypoints.async_omni_llm import AsyncOmniLLM

# Model configuration
MODEL = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

# Create engine args - similar to vLLM's AsyncEngineArgs but with omni extensions
engine_args = AsyncOmniEngineArgs(
    model=MODEL,
    model_arch="LlamaForCausalLM",  # Architecture for TinyLlama
    model_stage="thinker",
    stage_id=0,
    engine_output_type="text",
    gpu_memory_utilization=0.8,
    max_model_len=512,
    enforce_eager=False,
)

# Create vLLM config from engine args
usage_context = UsageContext.LLM_CLASS
vllm_config = engine_args.create_engine_config(usage_context=usage_context)

# Create AsyncOmniLLM using the factory method
engine = AsyncOmniLLM.from_vllm_config(
    vllm_config=vllm_config,
    engine_args=engine_args,
    usage_context=usage_context,
)
import asyncio
from collections.abc import AsyncGenerator

from vllm import SamplingParams
from vllm.v1.engine.async_llm import StreamingInput
from vllm.sampling_params import RequestOutputKind

# Get tokenizer for encoding/decoding
tokenizer = engine.tokenizer

# Initial prompt and injection phrase
initial_prompt = "what is your name."
inject_phrase = "i am james and i like to play football everyday"

prompt_tokens = tokenizer.encode(initial_prompt, add_special_tokens=False)
inject_tokens = tokenizer.encode(inject_phrase, add_special_tokens=False)

print(f"Initial prompt: '{initial_prompt}'")
print(f"Prompt tokens: {prompt_tokens}")
print(f"\nInjection phrase: '{inject_phrase}'")
print(f"Injection tokens ({len(inject_tokens)}): {inject_tokens}")
print("=" * 70)

# Sampling params for streaming input
# NOTE: output_kind must be DELTA (not FINAL_ONLY), stop strings not supported
sampling_params = SamplingParams(
    max_tokens=1,  # Generate 1 token per input chunk
    temperature=0.7, 
    top_p=0.9, 
    ignore_eos=True,
    output_kind=RequestOutputKind.DELTA,
)
session_id = "streaming_session"

# Queue for synchronizing: output consumer tells input producer when to send next chunk
next_input_queue: asyncio.Queue[int | None] = asyncio.Queue()

# Track state
kv_context = prompt_tokens.copy()
decoded_tokens: list[int] = []

async def streaming_input_generator() -> AsyncGenerator[StreamingInput, None]:
    """Async generator that yields input chunks.
    
    This generator:
    1. Sends the initial prompt
    2. Waits for signal from output consumer
    3. Sends injection tokens one at a time
    4. Exits when all injection tokens are sent
    """
    # Send initial prompt
    yield StreamingInput(
        prompt={"prompt_token_ids": prompt_tokens},
        sampling_params=sampling_params,
    )
    
    # Wait for outputs and inject tokens
    step = 0
    while step < len(inject_tokens):
        # Wait for signal from output consumer (contains the step number or None to stop)
        signal = await next_input_queue.get()
        if signal is None:
            break
            
        # Inject next token
        inject_token = inject_tokens[step]
        inject_text = tokenizer.decode([inject_token])
        print(f"  Injecting token {step}: '{inject_text}' (id={inject_token})")
        kv_context.append(inject_token)
        
        # Send the injection token as next input chunk
        yield StreamingInput(
            prompt={"prompt_token_ids": [inject_token]},
            sampling_params=sampling_params,
        )
        step += 1
    
    print("Input generator finished")

# Run the streaming session
step = 0
async for output in engine.generate(
    streaming_input_generator(),  # Pass the async generator directly!
    sampling_params=sampling_params,
    request_id=session_id,
):
    # Process each output
    if output.outputs and output.outputs[0].token_ids:
        decoded_token = output.outputs[0].token_ids[-1]
        decoded_text = tokenizer.decode([decoded_token])
        decoded_tokens.append(decoded_token)
        kv_context.append(decoded_token)
        
        print(f"Step {step}: decoded '{decoded_text}' (id={decoded_token})")
        
        # Tell input generator to send next token
        if step < len(inject_tokens):
            await next_input_queue.put(step)
        
        step += 1
    
    # Check if we should stop
    if step >= len(inject_tokens):
        # Let the input generator know we're done
        await next_input_queue.put(None)
        break

@Gaohan123 Gaohan123 mentioned this pull request Jan 28, 2026
35 tasks
… input requests

Signed-off-by: Viacheslav Klimkov <vklimkov@nvidia.com>
Copy link
Copy Markdown
Collaborator

@Gaohan123 Gaohan123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. Please refer to AI review and my reviews. Besides, there are several quetions and suggestion:

  1. which version of vLLM does the PR work on?
  2. Could you please provide an example of validation?
  3. Please supplement some Unit Tests for protecting key methods you added.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you remove this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which version of vLLM does the PR work on?

this PR is based just on top of the change that we want to take in - input streaming. It was merged to main in 91601ff47810de38fa117e09e34c18504a919a68. I run vllm-omni with vllm from this commit. If you'd like specific version of vllm after the commit - let me know and i will move to that.

Could you please provide an example of validation?

Added e2e test, here is a basic flow: test

Please supplement some Unit Tests for protecting key methods you added.

added unit tests:

  • test_omni_ar_scheduler_streaming.py
  • test_output_processor_streaming.py
  • test_omni_gpu_model_runner_streaming.py

more tests for this change are in vllm: vllm-project/vllm#28973

from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.executor.abstract import Executor
from vllm.v1.metrics.loggers import StatLoggerFactory, StatLoggerManager
from vllm.v1.executor import Executor
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All modifications in the file seems only related to logging. Is it related to streaming input?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really. I try to use more recent vllm (with streaming input) and it has changes to the logging. AsyncOmniLLM is based on top of AsyncLLM which now has this new logger related arguments (here).

since we dont call parent constructor, we need to replicate changes in async_omni_llm as well

@Gaohan123
Copy link
Copy Markdown
Collaborator

@vklimkov-nvidia Hello, any updates?

@vklimkov-nvidia
Copy link
Copy Markdown
Author

sorry for the delay on this! I got caught up with some updates to our fork (https://github.com/vklimkov-nvidia/vllm/tree/vklimkov/voicechat). I'm getting back into this PR now and should have an update for you shortly

Signed-off-by: Viacheslav Klimkov <vklimkov@nvidia.com>
Signed-off-by: Viacheslav Klimkov <vklimkov@nvidia.com>
@vklimkov-nvidia
Copy link
Copy Markdown
Author

confirmed that since v0.16 vllm support merged to main, streaming input is already supported. closing

@ekagra-ranjan
Copy link
Copy Markdown
Contributor

Hi @vklimkov-nvidia - I just had a look at the changes in your PR. It changes vllm-omni files. I am curious as to how does upgrading to vllm 0.16 bypass the need for your changes?

@hsliuustc0106
Copy link
Copy Markdown
Collaborator

confirmed that since v0.16 vllm support merged to main, streaming input is already supported. closing

have you tested locally?

cc @amy-why-3459 @Gaohan123

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants