Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot retrieve reasoning_content while streaming #29513

Open
5 tasks done
yigit353 opened this issue Jan 30, 2025 · 4 comments
Open
5 tasks done

Cannot retrieve reasoning_content while streaming #29513

yigit353 opened this issue Jan 30, 2025 · 4 comments

Comments

@yigit353
Copy link

Checked other resources

  • I added a very descriptive title to this issue.
  • I searched the LangChain documentation with the integrated search.
  • I used the GitHub search to find a similar question and didn't find it.
  • I am sure that this is a bug in LangChain rather than my code.
  • The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).

Example Code

When I run the simple code:

from dotenv import load_dotenv
from langchain_core.prompts import ChatPromptTemplate
from langchain_deepseek import ChatDeepSeek

load_dotenv()

llm = ChatDeepSeek(model="deepseek-reasoner", temperature=0, streaming=True)

prompt = ChatPromptTemplate(
    [
        (
            "system",
            "You are a helpful assistant that translates {input_language} to {output_language}.",
        ),
        ("human", "{input}"),
    ]
)

params = {
    "input_language": "English",
    "output_language": "German",
    "input": "I love programming.",
}

messages = prompt.invoke(params)

for chunk in llm.stream(messages):
    print(chunk)

I get empty reasoning_content while streaming. However, I need it to display in UI in real-time.
For that I wrote my own solution:

import asyncio
from typing import Any, Optional, Iterator
from typing import AsyncIterator

from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain_core.messages import AIMessageChunk
from langchain_core.outputs import ChatGenerationChunk, LLMResult
from langchain_openai import ChatOpenAI


class DeepseekChatOpenAI(ChatOpenAI):
    async def _astream(
            self,
            messages: Any,
            stop: Optional[Any] = None,
            run_manager: Optional[Any] = None,
            **kwargs: Any,
    ) -> AsyncIterator[AIMessageChunk]:
        openai_messages = []
        for msg in messages:
            if isinstance(msg, HumanMessage):
                openai_messages.append({"role": "user", "content": msg.content})
            elif isinstance(msg, AIMessage):
                openai_messages.append({"role": "assistant", "content": msg.content})
            elif isinstance(msg, SystemMessage):
                openai_messages.append({"role": "system", "content": msg.content})
            else:
                raise ValueError(f"Unsupported message type: {type(msg)}")

        params = {
            "model": self.model_name,
            "messages": openai_messages,
            **self.model_kwargs,
            **kwargs,
            "extra_body": {
                "enable_enhanced_generation": True,
                **(kwargs.get("extra_body", {})),
                **(self.model_kwargs.get("extra_body", {}))
            }
        }
        params = {k: v for k, v in params.items() if v not in (None, {}, [])}

        # Create and process the stream
        async for chunk in await self.async_client.create(
                stream=True,
                **params
        ):
            content = chunk.choices[0].delta.content or ""
            reasoning = chunk.choices[0].delta.model_extra.get("reasoning_content", "") if chunk.choices[
                0].delta.model_extra else ""
            if content:
                yield ChatGenerationChunk(
                    message=AIMessageChunk(content=content),
                    generation_info={"reasoning": reasoning}
                )
            if reasoning:
                yield ChatGenerationChunk(
                    message=AIMessageChunk(
                        content="",
                        additional_kwargs={"reasoning": reasoning}
                    ),
                    generation_info={"reasoning": reasoning}
                )

    def invoke(
            self,
            messages: Any,
            stop: Optional[Any] = None,
            run_manager: Optional[Any] = None,
            **kwargs: Any,
    ) -> AIMessage:
        async def _ainvoke():
            combined_content = []
            combined_reasoning = []
            async for chunk in self._astream(messages, stop, run_manager, **kwargs):
                if chunk.message.content:
                    combined_content.append(chunk.message.content)
                # If reasoning is in additional_kwargs, gather that too
                if "reasoning" in chunk.message.additional_kwargs:
                    combined_reasoning.append(
                        chunk.message.additional_kwargs["reasoning"]
                    )
            return AIMessage(
                content="".join(combined_content),
                additional_kwargs={"reasoning": "".join(combined_reasoning)} if combined_reasoning else {}
            )

        return asyncio.run(_ainvoke())

Now I can get my results as a workaround.

For that I created a project around it: https://github.com/yigit353/DeepSeekRAGChat
You can read the post about it: https://www.linkedin.com/posts/yigitbekir_github-yigit353deepseekragchat-a-streamlit-based-activity-7290487810770309120-J0Ln?utm_source=share&utm_medium=member_desktop

Thank you!

Error Message and Stack Trace (if applicable)

No response

Description

  • I expect to get reasoning_content in additional_kwargs as a key value pair while streaming with llm.stream for llm = ChatDeepSeek(model="deepseek-reasoner", temperature=0, streaming=True)
  • I get empty content until the reasoning finishes and always empty additional_kwargs

System Info

System Information

OS: Windows
OS Version: 10.0.26100
Python Version: 3.11.9 (tags/v3.11.9:de54cf5, Apr 2 2024, 10:12:12) [MSC v.1938 64 bit (AMD64)]

Package Information

langchain_core: 0.3.33
langchain: 0.3.14
langchain_community: 0.3.14
langsmith: 0.2.11
langchain_chroma: 0.2.0
langchain_deepseek: Installed. No version info available.
langchain_groq: 0.2.3
langchain_huggingface: 0.1.2
langchain_ollama: 0.2.2
langchain_openai: 0.3.3
langchain_text_splitters: 0.3.5

Optional packages not installed

langserve

Other Dependencies

aiohttp: 3.11.11
async-timeout: Installed. No version info available.
chromadb: 0.5.23
dataclasses-json: 0.6.7
fastapi: 0.115.6
groq: 0.15.0
httpx: 0.27.2
httpx-sse: 0.4.0
huggingface-hub: 0.27.1
jsonpatch: 1.33
langsmith-pyo3: Installed. No version info available.
numpy: 1.26.4
ollama: 0.4.6
openai: 1.60.2
orjson: 3.10.15
packaging: 24.2
pydantic: 2.10.5
pydantic-settings: 2.7.1
PyYAML: 6.0.2
requests: 2.32.3
requests-toolbelt: 1.0.0
sentence-transformers: 3.4.0
SQLAlchemy: 2.0.37
tenacity: 9.0.0
tiktoken: 0.8.0
tokenizers: 0.21.0
transformers: 4.48.1
typing-extensions: 4.12.2
zstandard: Installed. No version info available.

@codergma
Copy link

codergma commented Feb 2, 2025

Like me, I believe that the method _convert_delta_to_message_chunk() did not retrieve the key reasoning_content.

def _convert_delta_to_message_chunk(

codergma added a commit to codergma/langchain that referenced this issue Feb 2, 2025
…tent while streaming

Extract reasoning_content in the _convert_delta_to_message_chunk function
codergma added a commit to codergma/langchain that referenced this issue Feb 2, 2025
…tent while streaming

Extract reasoning_content in the _convert_delta_to_message_chunk function
@yigit353
Copy link
Author

yigit353 commented Feb 2, 2025

Like me, I believe that the method _convert_delta_to_message_chunk() did not retrieve the key reasoning_content.

langchain/libs/partners/openai/langchain_openai/chat_models/base.py

Line 262 in 0c782ee

def _convert_delta_to_message_chunk(

You proposed a very minimal and excellent solution, however, I guess maintainers didn't want to change the base class for the solution. I believe that a change in the specialized DeepSeekChat class might be better if possible.

@maye76
Copy link

maye76 commented Feb 7, 2025

This is my solution.

from langchain_core.outputs import ChatResult
from langchain_openai.chat_models.base import (
    _create_usage_metadata,
    _handle_openai_bad_request,
    warnings
)
from langchain_core.callbacks import CallbackManagerForLLMRun
from langchain_core.messages import (
    BaseMessage,
    AIMessageChunk,
    BaseMessageChunk,
    ChatMessageChunk,
    FunctionMessageChunk,
    HumanMessageChunk,
    SystemMessageChunk,
    ToolMessageChunk,
)
from langchain_core.messages.ai import UsageMetadata
from langchain_core.messages.tool import tool_call_chunk
from langchain_core.outputs import ChatGenerationChunk, ChatResult

def _convert_delta_to_message_chunk(
    _dict: Mapping[str, Any], default_class: Type[BaseMessageChunk]
) -> BaseMessageChunk:
    id_ = _dict.get("id")
    role = cast(str, _dict.get("role"))
    content = cast(str, _dict.get("content") or "")
    additional_kwargs: Dict = {}
    if _dict.get("function_call"):
        function_call = dict(_dict["function_call"])
        if "name" in function_call and function_call["name"] is None:
            function_call["name"] = ""
        additional_kwargs["function_call"] = function_call
    tool_call_chunks = []
    if raw_tool_calls := _dict.get("tool_calls"):
        additional_kwargs["tool_calls"] = raw_tool_calls
        try:
            tool_call_chunks = [
                tool_call_chunk(
                    name=rtc["function"].get("name"),
                    args=rtc["function"].get("arguments"),
                    id=rtc.get("id"),
                    index=rtc["index"],
                )
                for rtc in raw_tool_calls
            ]
        except KeyError:
            pass

    if role == "user" or default_class == HumanMessageChunk:
        return HumanMessageChunk(content=content, id=id_)
    elif role == "assistant" or default_class == AIMessageChunk:
        if reasoning_content := _dict.get("reasoning_content"):
            additional_kwargs["reasoning_content"] = reasoning_content
        return AIMessageChunk(
            content=content,
            additional_kwargs=additional_kwargs,
            id=id_,
            tool_call_chunks=tool_call_chunks,  # type: ignore[arg-type]
        )
    elif role in ("system", "developer") or default_class == SystemMessageChunk:
        if role == "developer":
            additional_kwargs = {"__openai_role__": "developer"}
        else:
            additional_kwargs = {}
        return SystemMessageChunk(
            content=content, id=id_, additional_kwargs=additional_kwargs
        )
    elif role == "function" or default_class == FunctionMessageChunk:
        return FunctionMessageChunk(content=content, name=_dict["name"], id=id_)
    elif role == "tool" or default_class == ToolMessageChunk:
        return ToolMessageChunk(
            content=content, tool_call_id=_dict["tool_call_id"], id=id_
        )
    elif role or default_class == ChatMessageChunk:
        return ChatMessageChunk(content=content, role=role, id=id_)
    else:
        return default_class(content=content, id=id_)  # type: ignore

def _convert_chunk_to_generation_chunk(
    chunk: dict, default_chunk_class: Type, base_generation_info: Optional[Dict]
) -> Optional[ChatGenerationChunk]:
    if chunk.get("type") == "content.delta":  # from beta.chat.completions.stream
        return None
    token_usage = chunk.get("usage")
    choices = (
        chunk.get("choices", [])
        # from beta.chat.completions.stream
        or chunk.get("chunk", {}).get("choices", [])
    )

    usage_metadata: Optional[UsageMetadata] = (
        _create_usage_metadata(token_usage) if token_usage else None
    )
    if len(choices) == 0:
        # logprobs is implicitly None
        generation_chunk = ChatGenerationChunk(
            message=default_chunk_class(content="", usage_metadata=usage_metadata)
        )
        return generation_chunk

    choice = choices[0]
    if choice["delta"] is None:
        return None

    message_chunk = _convert_delta_to_message_chunk(
        choice["delta"], default_chunk_class
    )
    generation_info = {**base_generation_info} if base_generation_info else {}

    if finish_reason := choice.get("finish_reason"):
        generation_info["finish_reason"] = finish_reason
        if model_name := chunk.get("model"):
            generation_info["model_name"] = model_name
        if system_fingerprint := chunk.get("system_fingerprint"):
            generation_info["system_fingerprint"] = system_fingerprint

    logprobs = choice.get("logprobs")
    if logprobs:
        generation_info["logprobs"] = logprobs

    if usage_metadata and isinstance(message_chunk, AIMessageChunk):
        message_chunk.usage_metadata = usage_metadata

    generation_chunk = ChatGenerationChunk(
        message=message_chunk, generation_info=generation_info or None
    )
    return generation_chunk

class ChatOpenAIReasoning(ChatOpenAI):
    def _create_chat_result(
        self,
        response: Union[dict, openai.BaseModel],
        generation_info: Optional[Dict] = None,
    ) -> ChatResult:
        rtn = super()._create_chat_result(response, generation_info)

        if not isinstance(response, openai.BaseModel):
            return rtn

        if hasattr(response.choices[0].message, "reasoning_content"):  # type: ignore
            rtn.generations[0].message.additional_kwargs["reasoning_content"] = (
                response.choices[0].message.reasoning_content  # type: ignore
            )

        return rtn
    
    def _stream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        kwargs["stream"] = True
        payload = self._get_request_payload(messages, stop=stop, **kwargs)
        default_chunk_class: Type[BaseMessageChunk] = AIMessageChunk
        base_generation_info = {}

        if "response_format" in payload:
            if self.include_response_headers:
                warnings.warn(
                    "Cannot currently include response headers when response_format is "
                    "specified."
                )
            payload.pop("stream")
            response_stream = self.root_client.beta.chat.completions.stream(**payload)
            context_manager = response_stream
        else:
            if self.include_response_headers:
                raw_response = self.client.with_raw_response.create(**payload)
                response = raw_response.parse()
                base_generation_info = {"headers": dict(raw_response.headers)}
            else:
                response = self.client.create(**payload)
            context_manager = response
        try:
            with context_manager as response:
                is_first_chunk = True
                for chunk in response:
                    if not isinstance(chunk, dict):
                        chunk = chunk.model_dump()
                    generation_chunk = _convert_chunk_to_generation_chunk(
                        chunk,
                        default_chunk_class,
                        base_generation_info if is_first_chunk else {},
                    )
                    if generation_chunk is None:
                        continue
                    default_chunk_class = generation_chunk.message.__class__
                    logprobs = (generation_chunk.generation_info or {}).get("logprobs")
                    if run_manager:
                        run_manager.on_llm_new_token(
                            generation_chunk.text,
                            chunk=generation_chunk,
                            logprobs=logprobs,
                        )
                    is_first_chunk = False
                    yield generation_chunk
        except openai.BadRequestError as e:
            _handle_openai_bad_request(e)
        if hasattr(response, "get_final_completion") and "response_format" in payload:
            final_completion = response.get_final_completion()
            generation_chunk = self._get_generation_chunk_from_completion(
                final_completion
            )
            if run_manager:
                run_manager.on_llm_new_token(
                    generation_chunk.text, chunk=generation_chunk
                )
            yield generation_chunk

@EricCorleone
Copy link

EricCorleone commented Feb 8, 2025

For me, on Mac everything works fine, but on Linux "reasoning_content" is None.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants